Skip to content

Commit

Permalink
Make handlers Deferrable-aware
Browse files Browse the repository at this point in the history
If a handler returns an EM::Deferrable, wait for the EM result to be ready
before considering the handler complete, raising an error result as an
exception in the current context just like a non-async handler would have done.
This allows async filters/handlers to run in order.

To be able to sync, handle_data is wrapped in a Fiber.  To aid in testing or
other cases that want to wait until the whole process is done, handle_data
returns an EM::Deferrable that will signal the success or failure of the whole
process (for both the sync and async cases).  This can be safely ignored by most
callers.
  • Loading branch information
singpolyma committed Nov 10, 2022
1 parent 03f9cda commit f78d56e
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions lib/blather/client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,23 @@ def receive_data(stanza)
end

def handle_data(stanza)
catch(:halt) do
run_filters :before, stanza
handle_stanza stanza
run_filters :after, stanza
end
df = EM::DefaultDeferrable.new

Fiber.new {
begin
catch(:halt) do
run_filters :before, stanza
handle_stanza stanza
run_filters :after, stanza
end
df.succeed
rescue => e
df.fail(e) # Fail in case anyone is listening
raise e # But still raise so it's not a silent failure
end
}.resume

df
end

# @private
Expand Down Expand Up @@ -303,7 +315,7 @@ def call_handler_for(type, stanza)
end

def call_handler(handler, guards, stanza)
if guards.first.respond_to?(:to_str)
result = if guards.first.respond_to?(:to_str)
found = stanza.find(*guards)
throw :pass if found.empty?

Expand All @@ -313,6 +325,27 @@ def call_handler(handler, guards, stanza)

handler.call(stanza)
end

return result unless result.is_a?(EM::Deferrable)

unless EM.reactor_thread == Thread.current
raise "Cannot sync EM::Deferrable across threads. " \
"Did you forget to setup with async: true?"
end

fiber = Fiber.current
EM.next_tick do
result.callback(&fiber.method(:resume))
result.errback do |e|
if e.is_a?(Exception)
fiber.raise(e)
else
fiber.raise(e.to_s)
end
end
end

Fiber.yield
end

# If any of the guards returns FALSE this returns true
Expand Down

0 comments on commit f78d56e

Please sign in to comment.