Skip to content

Commit

Permalink
Add a proper interface for enqueuing events.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 25, 2024
1 parent ae3a358 commit ef9227c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
2 changes: 1 addition & 1 deletion lib/live/element.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def handle(event)
def rpc(*arguments)
if @page
# This update might not be sent right away. Therefore, mutable arguments may be serialized to JSON at a later time (or never). This could be a race condition:
@page.updates.enqueue(arguments)
@page.enqueue(arguments)
else
# This is a programming error, as it probably means the element is still part of the logic of the server side (e.g. async loop), but it is not bound to a page, so there is nothing to update/access/rpc.
raise PageError, "Element is not bound to a page, make sure to implement #close!"
Expand Down
19 changes: 8 additions & 11 deletions lib/live/page.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ def initialize(resolver)
@updates = Async::Queue.new
end

# The queue of outstanding events to be sent to the client.
attr :updates

# Bind a client-side element to a server side element.
# @parameter element [Live::Element] The element to bind.
def bind(element)
Expand Down Expand Up @@ -84,6 +81,10 @@ def close
end
end

def enqueue(update)
@updates.enqueue(::Protocol::WebSocket::TextMessage.generate(update))
end

# Process a single incoming message from the network.
def process_message(message)
case message[0]
Expand All @@ -93,15 +94,15 @@ def process_message(message)
self.bind(element)
else
Console.warn(self, "Could not resolve element:", message)
@updates.enqueue(["error", message[1], "Could not resolve element!"])
self.enqueue(["error", message[1], "Could not resolve element!"])
end
when "unbind"
# Unbind a client-side element from a server-side element.
if element = @elements.delete(message[1])
element.close unless @attached.key?(message[1])
else
Console.warn(self, "Could not unbind element:", message)
@updates.enqueue(["error", message[1], "Could not unbind element!"])
self.enqueue(["error", message[1], "Could not unbind element!"])
end
when "event"
# Handle an event from the client.
Expand All @@ -119,11 +120,7 @@ def run(connection, keep_alive: 10)

queue_task = task.async do
while update = @updates.dequeue
if update == :ping
connection.send_ping
else
::Protocol::WebSocket::TextMessage.generate(update).send(connection)
end
update.send(connection)

# Flush the output if there are no more updates:
if @updates.empty?
Expand All @@ -140,7 +137,7 @@ def run(connection, keep_alive: 10)

# We synchronize all writes to the update queue:
if duration > keep_alive
@updates.enqueue(:ping)
@updates.enqueue(::Protocol::WebSocket::PingMessage.new)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions live.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = ">= 3.1"

spec.add_dependency "async-websocket", "~> 0.27"
spec.add_dependency "protocol-websocket", "~> 0.19"
spec.add_dependency "xrb"
end

0 comments on commit ef9227c

Please sign in to comment.