Skip to content

Commit

Permalink
Merge branch 'trunk' into sftp_protocol_proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
saikumar9 authored Nov 26, 2024
2 parents 36b12da + 6d3acdb commit fdc6fff
Show file tree
Hide file tree
Showing 35 changed files with 649 additions and 145 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Downstream

on:
workflow_dispatch:
branches:
- trunk
push:
branches:
- trunk

jobs:
check-dependencies:
runs-on: ubuntu-latest
steps:
- name: Bundle Dependency Checks
uses: convictional/[email protected]
with:
owner: ideacrew
repo: ic_dependency_jamboree
github_token: ${{ secrets.GH_PAT }}
workflow_file_name: test_bundle.yml
ref: trunk
propagate_failure: false

2 changes: 1 addition & 1 deletion .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2']
ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--format documentation
--color
--require spec_helper
--exclude-pattern "spec/rails_app/**/*"
3 changes: 3 additions & 0 deletions .rspec_rails_specs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--format documentation
--color
--exclude-pattern "**/*"
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gemspec
group :development, :test do
gem "rails", '>= 6.1.4'
gem "rspec-rails"
gem "parallel_tests"
gem "pry", platform: :mri, require: false
gem "pry-byebug", platform: :mri, require: false
gem 'rubocop'
Expand Down
28 changes: 23 additions & 5 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'
require 'event_source/boot_registry'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand All @@ -64,14 +65,23 @@ class << self
:async_api_schemas=

def configure
@configured = true
yield(config)
end

def initialize!
load_protocols
create_connections
load_async_api_resources
load_components
def initialize!(force = false)
# Don't boot if I was never configured.
return unless @configured
boot_registry.boot!(force) do
load_protocols
create_connections
load_async_api_resources
load_components
end
end

def boot_registry
@boot_registry ||= EventSource::BootRegistry.new
end

def config
Expand All @@ -89,6 +99,14 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def register_subscriber(subscriber_klass)
boot_registry.register_subscriber(subscriber_klass)
end

def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end
end

class EventSourceLogger
Expand Down
96 changes: 96 additions & 0 deletions lib/event_source/boot_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'set'
require 'monitor'

module EventSource
# This class manages correct/loading of subscribers and publishers
# based on the current stage of the EventSource lifecycle.
#
# Depending on both the time the initialization of EventSource is invoked
# and when subscriber/publisher code is loaded, this can become complicated.
# This is largely caused by two confounding factors:
# 1. We want to delay initialization of EventSource until Rails is fully
# 'ready'
# 2. Based on the Rails environment, such as production, development, or
# test (primarily how those different environments treat lazy vs. eager
# loading of classes in a Rails application), subscriber and publisher
# code can be loaded before, after, or sometimes even DURING the
# EventSource boot process - we need to support all models
class BootRegistry
def initialize
@unbooted_publishers = Set.new
@unbooted_subscribers = Set.new
@booted_publishers = Set.new
@booted_subscribers = Set.new
# This is our re-entrant mutex. We're going to use it to make sure that
# registration and boot methods aren't allowed to simultaneously alter
# our state. You'll notice most methods on this class are wrapped in
# synchronize calls against this.
@bootex = Monitor.new
@booted = false
end

def boot!(force = false)
@bootex.synchronize do
return if @booted && !force
yield
boot_publishers!
boot_subscribers!
@booted = true
end
end

# Register a publisher for EventSource.
#
# If the EventSource hasn't been booted, save publisher for later.
# Otherwise, boot it now.
def register_publisher(publisher_klass)
@bootex.synchronize do
if @booted
publisher_klass.validate
@booted_publishers << publisher_klass
else
@unbooted_publishers << publisher_klass
end
end
end

# Register a subscriber for EventSource.
#
# If the EventSource hasn't been booted, save the subscriber for later.
# Otherwise, boot it now.
def register_subscriber(subscriber_klass)
@bootex.synchronize do
if @booted
subscriber_klass.create_subscription
@booted_subscribers << subscriber_klass
else
@unbooted_subscribers << subscriber_klass
end
end
end

# Boot the publishers.
def boot_publishers!
@bootex.synchronize do
@unbooted_publishers.each do |pk|
pk.validate
@booted_publishers << pk
end
@unbooted_publishers = Set.new
end
end

# Boot the subscribers.
def boot_subscribers!
@bootex.synchronize do
@unbooted_subscribers.each do |sk|
sk.create_subscription
@booted_subscribers << sk
end
@unbooted_subscribers = Set.new
end
end
end
end
70 changes: 24 additions & 46 deletions lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,6 @@ def convert_subscriber_prefetch(options)

def resolve_subscriber_routing_keys(channel, operation); end

# def register_subscription(subscriber_klass, bindings)
# consumer_proxy = consumer_proxy_for(bindings)

# consumer_proxy.on_delivery do |delivery_info, metadata, payload|
# on_receive_message(
# subscriber_klass,
# delivery_info,
# metadata,
# payload
# )
# end

# subscribe_consumer(consumer_proxy)
# end

# def subscribe_consumer(consumer_proxy)
# @subject.subscribe_with(consumer_proxy)
# @consumers.push(consumer_proxy)
# end

# def consumer_proxy_for(bindings)
# operation_bindings = convert_to_consumer_options(bindings[:amqp])

# logger.debug 'consumer proxy options:'
# logger.debug operation_bindings.inspect

# BunnyConsumerProxy.new(
# @subject.channel,
# @subject,
# '',
# operation_bindings[:no_ack],
# operation_bindings[:exclusive]
# )
# end

def on_receive_message(
subscriber_klass,
delivery_info,
Expand All @@ -164,17 +129,7 @@ def on_receive_message(
logger.debug metadata.inspect
logger.debug payload.inspect

if delivery_info.routing_key
routing_key = [app_name, delivery_info.routing_key].join(delimiter)
executable = subscriber_klass.executable_for(routing_key)
end

unless executable
routing_key = [app_name, exchange_name].join(delimiter)
executable = subscriber_klass.executable_for(routing_key)
end

logger.debug "routing_key: #{routing_key}"
executable = find_executable(subscriber_klass, delivery_info)
return unless executable

subscriber = subscriber_klass.new
Expand All @@ -196,6 +151,13 @@ def on_receive_message(
subscriber = nil
end

def find_executable(subscriber_klass, delivery_info)
subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass)

find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix) ||
find_default_executable(subscriber_klass, subscriber_suffix)
end

def respond_to_missing?(name, include_private); end

# Forward all missing method calls to the Bunny::Queue instance
Expand All @@ -205,6 +167,22 @@ def method_missing(name, *args)

private

def subscriber_klass_name_to_suffix(subscriber_klass)
subscriber_klass.name.downcase.gsub("::", '_')
end

def find_executable_for_routing_key(subscriber_klass, delivery_info, subscriber_suffix)
return unless delivery_info.routing_key

routing_key = [app_name, delivery_info.routing_key].join(delimiter)
subscriber_klass.executable_for(routing_key + "_#{subscriber_suffix}")
end

def find_default_executable(subscriber_klass, subscriber_suffix)
default_routing_key = [app_name, exchange_name].join(delimiter)
subscriber_klass.executable_for(default_routing_key + "_#{subscriber_suffix}")
end

def delimiter
EventSource.delimiter(:amqp)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/event_source/protocols/amqp_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
require_relative 'amqp/contracts/contract'

Gem
.find_files('event_source/protocols/amqp/contracts/**/*.rb')
.find_files('event_source/protocols/amqp/contracts/**/*.rb', false)
.sort
.each { |f| require(f) }

Expand Down
10 changes: 6 additions & 4 deletions lib/event_source/protocols/http/faraday_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ def actions
# @return [Queue] Queue instance
def subscribe(subscriber_klass, _options)
unique_key = [app_name, formatted_exchange_name].join(delimiter)
logger.debug "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.debug "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
executable = subscriber_klass.executable_for(unique_key)
@subject.actions.push(executable)
subscription_key = [app_name, formatted_exchange_name].join(delimiter)
subscriber_suffix = subscriber_klass.name.downcase.gsub('::', '_')
unique_key = subscription_key + "_#{subscriber_suffix}"
logger.info "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}"
logger.info "FaradayQueueProxy#register_subscription Unique_key #{unique_key}"
@subject.register_action(subscriber_klass, unique_key)
end

def consumer_proxy_for(operation_bindings)
Expand Down
17 changes: 11 additions & 6 deletions lib/event_source/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ def self.publisher_container
@publisher_container ||= Concurrent::Map.new
end

def self.initialization_registry
@initialization_registry ||= Concurrent::Array.new
end

def self.initialize_publishers
self.initialization_registry.each do |pub|
pub.validate
end
end

def self.[](exchange_ref)
# TODO: validate publisher already exists
# raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id)
Expand All @@ -46,12 +56,7 @@ def included(base)
}
base.extend(ClassMethods)

TracePoint.trace(:end) do |t|
if base == t.self
base.validate
t.disable
end
end
EventSource.register_publisher(base)
end

# methods to register events
Expand Down
16 changes: 13 additions & 3 deletions lib/event_source/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ class Queue
# @attr_reader [Object] queue_proxy the protocol-specific class supporting this DSL
# @attr_reader [String] name
# @attr_reader [Hash] bindings
# @attr_reader [Hash] actions
attr_reader :queue_proxy, :name, :bindings, :actions
attr_reader :queue_proxy, :name, :bindings

def initialize(queue_proxy, name, bindings = {})
@queue_proxy = queue_proxy
@name = name
@bindings = bindings
@subject = ::Queue.new
@actions = []
@registered_actions = []
end

# def subscribe(subscriber_klass, &block)
Expand Down Expand Up @@ -49,5 +48,16 @@ def close
def closed?
@subject.closed?
end

# Register an action to be performed, with a resolver class and key.
def register_action(resolver, key)
@registered_actions << [resolver, key]
end

def actions
@registered_actions.map do |ra|
ra.first.executable_for(ra.last)
end
end
end
end
Loading

0 comments on commit fdc6fff

Please sign in to comment.