diff --git a/anyt-core.gemspec b/anyt-core.gemspec index edfce15..590175a 100644 --- a/anyt-core.gemspec +++ b/anyt-core.gemspec @@ -22,6 +22,6 @@ Gem::Specification.new do |spec| spec.add_dependency "rails", ">= 6.0" spec.add_dependency "anyway_config", ">= 2.2.0" spec.add_dependency "websocket", "~> 1.2.4" - spec.add_dependency "websocket-client-simple", "~> 0.3.0" + spec.add_dependency "websocket-client-simple", "~> 0.8" spec.add_dependency "concurrent-ruby", "~> 1.0" end diff --git a/lib/anyt/client.rb b/lib/anyt/client.rb index edd9d8b..ed5fa6e 100644 --- a/lib/anyt/client.rb +++ b/lib/anyt/client.rb @@ -9,17 +9,38 @@ class Client require "websocket-client-simple" require "concurrent" - class TimeoutError < StandardError; end + class Error < StandardError; end + + class TimeoutError < Error; end + + class DisconnectedError < Error + attr_reader :event + + def initialize(event) + @event = event + if event + super("WebSocket disconnected (code=#{event.code}, reason=#{event.data})") + else + super("WebSocket disconnected abnormally") + end + end + end WAIT_WHEN_EXPECTING_EVENT = 5 WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 + private attr_reader :logger + + attr_reader :url + def initialize( ignore: [], url: Anyt.config.target_url, qs: "", cookies: "", headers: {}, protocol: "actioncable-v1-json", - timeout_multiplier: Anyt.config.timeout_multiplier + timeout_multiplier: Anyt.config.timeout_multiplier, + logger: AnyCable.logger ) + @logger = logger ignore_message_types = @ignore_message_types = ignore messages = @messages = Queue.new closed = @closed = Concurrent::Event.new @@ -32,8 +53,15 @@ def initialize( open = Concurrent::Promise.new + @url = url + + if !qs.empty? + @url += @url.include?("?") ? "&" : "?" + @url += qs + end + @ws = WebSocket::Client::Simple.connect( - url + "?#{qs}", + @url, headers: headers ) do |ws| ws.on(:error) do |event| @@ -54,13 +82,15 @@ def initialize( ws.on(:message) do |event| next if event.type == :ping if event.type == :close + messages << DisconnectedError.new(event) + has_messages.release closed.set else message = JSON.parse(event.data) next if ignore_message_types.include?(message["type"]) - AnyCable.logger.debug "Message received: #{message}" + logger.debug "Message received: #{message}" messages << message has_messages.release @@ -81,11 +111,13 @@ def initialize( def receive(timeout: WAIT_WHEN_EXPECTING_EVENT) timeout *= @timeout_multiplier - raise TimeoutError, "Timed out to receive message" unless - @has_messages.try_acquire(1, timeout) + unless @has_messages.try_acquire(1, timeout) + raise DisconnectedError if closed? + raise TimeoutError, "Timed out to receive message" + end msg = @messages.pop(true) - raise msg if msg.is_a?(Exception) + raise msg if msg.is_a?(Exception) || msg.is_a?(Error) msg end