Skip to content

Commit

Permalink
add support for ruby 3.0.x - fixes keypup-io#27
Browse files Browse the repository at this point in the history
  • Loading branch information
Arnaud Lachaume committed May 31, 2021
1 parent 092885e commit 05471e2
Show file tree
Hide file tree
Showing 23 changed files with 101 additions and 94 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ jobs:
ruby:
- '2.5.x'
- '2.6.x'
- '2.7.x'
- '3.0.x'
appraisal:
- 'google-cloud-tasks-1.0'
- 'google-cloud-tasks-1.1'
Expand Down
2 changes: 1 addition & 1 deletion examples/rails/.ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ruby-2.5.5
ruby-2.7.1
60 changes: 29 additions & 31 deletions examples/sinatra/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,33 @@
end

post '/cloudtasker/run' do
begin
# Authenticate request
Cloudtasker::Authenticator.verify!(request.env['HTTP_AUTHORIZATION'].to_s.split(' ').last)

# Capture content and decode content
content = request.body.read
content = Base64.decode64(content) if request.env['HTTP_CONTENT_TRANSFER_ENCODING'].to_s.downcase == 'base64'

# Format job payload
payload = JSON.parse(content)
.merge(
job_retries: request.env[Cloudtasker::Config::RETRY_HEADER].to_i,
task_id: request.env[Cloudtasker::Config::TASK_ID_HEADER]
)

# Process payload
Cloudtasker::WorkerHandler.execute_from_payload!(payload)
return 204
rescue Cloudtasker::DeadWorkerError
# 205: job will NOT be retried
return 205
rescue Cloudtasker::AuthenticationError
# 401: Unauthorized
return 401
rescue Cloudtasker::InvalidWorkerError
# 404: Job will be retried
return 404
rescue StandardError
# 422: Job will be retried
return 423
end
# Authenticate request
Cloudtasker::Authenticator.verify!(request.env['HTTP_AUTHORIZATION'].to_s.split(' ').last)

# Capture content and decode content
content = request.body.read
content = Base64.decode64(content) if request.env['HTTP_CONTENT_TRANSFER_ENCODING'].to_s.downcase == 'base64'

# Format job payload
payload = JSON.parse(content)
.merge(
job_retries: request.env[Cloudtasker::Config::RETRY_HEADER].to_i,
task_id: request.env[Cloudtasker::Config::TASK_ID_HEADER]
)

# Process payload
Cloudtasker::WorkerHandler.execute_from_payload!(payload)
return 204
rescue Cloudtasker::DeadWorkerError
# 205: job will NOT be retried
return 205
rescue Cloudtasker::AuthenticationError
# 401: Unauthorized
return 401
rescue Cloudtasker::InvalidWorkerError
# 404: Job will be retried
return 404
rescue StandardError
# 422: Job will be retried
return 423
end
16 changes: 9 additions & 7 deletions lib/cloudtasker/backend/google_cloud_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,30 @@ class GoogleCloudTask
#
# Create the queue configured in Cloudtasker if it does not already exist.
#
# @param [String] queue_name The relative name of the queue.
# @param [String] :name The queue name
# @param [Integer] :concurrency The queue concurrency
# @param [Integer] :retries The number of retries for the queue
#
# @return [Google::Cloud::Tasks::V2beta3::Queue] The queue
#
def self.setup_queue(**opts)
def self.setup_queue(name: nil, concurrency: nil, retries: nil)
# Build full queue path
queue_name = opts[:name] || Cloudtasker::Config::DEFAULT_JOB_QUEUE
queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE
full_queue_name = queue_path(queue_name)

# Try to get existing queue
client.get_queue(full_queue_name)
rescue Google::Gax::RetryError
# Extract options
concurrency = (opts[:concurrency] || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i
retries = (opts[:retries] || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i
queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i
queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i

# Create queue on 'not found' error
client.create_queue(
client.location_path(config.gcp_project_id, config.gcp_location_id),
name: full_queue_name,
retry_config: { max_attempts: retries },
rate_limits: { max_concurrent_dispatches: concurrency }
retry_config: { max_attempts: queue_retries },
rate_limits: { max_concurrent_dispatches: queue_concurrency }
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/cloudtasker/backend/memory_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def self.create(payload)
payload = payload.merge(schedule_time: payload[:schedule_time].to_i)

# Save task
task = new(payload.merge(id: id))
task = new(**payload.merge(id: id))
queue << task

# Execute task immediately if in testing and inline mode enabled
Expand Down
10 changes: 7 additions & 3 deletions lib/cloudtasker/backend/redis_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def self.create(payload)
# Save job
redis.write(key(id), payload)
redis.sadd(key, id)
new(payload.merge(id: id))
new(**payload.merge(id: id))
end

#
Expand All @@ -103,7 +103,7 @@ def self.find(id)
gid = key(id)
return nil unless (payload = redis.fetch(gid))

new(payload.merge(id: id))
new(**payload.merge(id: id))
end

#
Expand Down Expand Up @@ -172,8 +172,12 @@ def gid
# Retry the task later.
#
# @param [Integer] interval The delay in seconds before retrying the task
# @param [Hash] opts Additional options
# @option opts [Boolean] :is_error Increase number of retries. Default to true.
#
def retry_later(interval, is_error: true)
def retry_later(interval, opts = {})
is_error = opts.to_h.fetch(:is_error, true)

redis.write(
gid,
retries: is_error ? retries + 1 : retries,
Expand Down
8 changes: 3 additions & 5 deletions lib/cloudtasker/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,9 @@ def setup_signals(write_pipe)
# USR1 and USR2 don't work on the JVM
sigs << 'USR2' unless jruby?
sigs.each do |sig|
begin
trap(sig) { write_pipe.puts(sig) }
rescue ArgumentError
puts "Signal #{sig} not supported"
end
trap(sig) { write_pipe.puts(sig) }
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/cloudtasker/cloud_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def self.backend
#
def self.find(id)
payload = backend.find(id)&.to_h
payload ? new(payload) : nil
payload ? new(**payload) : nil
end

#
Expand All @@ -51,7 +51,7 @@ def self.create(payload)
raise MaxTaskSizeExceededError if payload.to_json.bytesize > Config::MAX_TASK_SIZE

resp = backend.create(payload)&.to_h
resp ? new(resp) : nil
resp ? new(**resp) : nil
end

#
Expand Down
10 changes: 5 additions & 5 deletions lib/cloudtasker/cron/schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def self.create(**opts)
def self.find(id)
return nil unless (schedule_config = redis.fetch(key(id)))

new(schedule_config)
new(**schedule_config)
end

#
Expand Down Expand Up @@ -251,9 +251,9 @@ def next_time(*args)
#
# Buld edit the object attributes.
#
# @param [Hash] **opts The attributes to edit.
# @param [Hash] opts The attributes to edit.
#
def assign_attributes(**opts)
def assign_attributes(opts)
opts
.select { |k, _| instance_variables.include?("@#{k}".to_sym) }
.each { |k, v| instance_variable_set("@#{k}", v) }
Expand All @@ -262,9 +262,9 @@ def assign_attributes(**opts)
#
# Edit the object attributes and save the object in Redis.
#
# @param [Hash] **opts The attributes to edit.
# @param [Hash] opts The attributes to edit.
#
def update(**opts)
def update(opts)
assign_attributes(opts)
save
end
Expand Down
7 changes: 4 additions & 3 deletions lib/cloudtasker/redis_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ def search(pattern)
# Delegate all methods to the redis client.
#
# @param [String, Symbol] name The method to delegate.
# @param [Array<any>] *args The list of method arguments.
# @param [Array<any>] *args The list of method positional arguments.
# @param [Hash<any>] *kwargs The list of method keyword arguments.
# @param [Proc] &block Block passed to the method.
#
# @return [Any] The method return value
#
def method_missing(name, *args, &block)
def method_missing(name, *args, **kwargs, &block)
if Redis.method_defined?(name)
client.with { |c| c.send(name, *args, &block) }
client.with { |c| c.send(name, *args, **kwargs, &block) }
else
super
end
Expand Down
5 changes: 3 additions & 2 deletions lib/cloudtasker/unique_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ class Job
# Build a new instance of the class.
#
# @param [Cloudtasker::Worker] worker The worker at hand
# @param [Hash] worker The worker options
#
def initialize(worker, **kwargs)
def initialize(worker, opts = {})
@worker = worker
@call_opts = kwargs
@call_opts = opts
end

#
Expand Down
1 change: 1 addition & 0 deletions lib/cloudtasker/unique_job/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Cloudtasker
module UniqueJob
module Middleware
# TODO: kwargs to job otherwise it won't get the time_at
# Client middleware, invoked when jobs are scheduled
class Client
def call(worker, **_kwargs)
Expand Down
6 changes: 3 additions & 3 deletions lib/cloudtasker/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def self.from_hash(hash)
return nil unless worker_klass.include?(self)

# Return instantiated worker
worker_klass.new(payload.slice(:job_queue, :job_args, :job_id, :job_meta, :job_retries, :task_id))
worker_klass.new(**payload.slice(:job_queue, :job_args, :job_id, :job_meta, :job_retries, :task_id))
rescue NameError
nil
end
Expand Down Expand Up @@ -121,7 +121,7 @@ def perform_at(time_at, *args)
# @return [Cloudtasker::CloudTask] The Google Task response
#
def schedule(args: nil, time_in: nil, time_at: nil, queue: nil)
new(job_args: args, job_queue: queue).schedule({ interval: time_in, time_at: time_at }.compact)
new(job_args: args, job_queue: queue).schedule(**{ interval: time_in, time_at: time_at }.compact)
end

#
Expand Down Expand Up @@ -239,7 +239,7 @@ def schedule_time(interval: nil, time_at: nil)
#
def schedule(**args)
# Evaluate when to schedule the job
time_at = schedule_time(args)
time_at = schedule_time(**args)

# Schedule job through client middlewares
Cloudtasker.config.client_middleware.invoke(self, time_at: time_at) do
Expand Down
2 changes: 1 addition & 1 deletion lib/cloudtasker/worker_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class WorkerWrapper
#
def initialize(worker_name:, **opts)
@worker_name = worker_name
super(opts)
super(**opts)
end

#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
include_context 'of Cloudtasker ActiveJob instantiation'

subject :worker do
described_class.new(example_job_wrapper_args.merge(task_id: '00000001'))
described_class.new(**example_job_wrapper_args.merge(task_id: '00000001'))
end

let :example_unreconstructed_job_serialization do
Expand Down
2 changes: 1 addition & 1 deletion spec/cloudtasker/backend/google_cloud_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
let(:client) { instance_double('Google::Cloud::Tasks::V2beta3::Task') }

describe '.setup_queue' do
subject { described_class.setup_queue(opts) }
subject { described_class.setup_queue(**opts) }

let(:opts) { { name: relative_queue, concurrency: 20, retries: 100 } }
let(:queue) { instance_double('Google::Cloud::Tasks::V2beta3::Queue') }
Expand Down
10 changes: 5 additions & 5 deletions spec/cloudtasker/backend/memory_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
let(:worker_name) { 'TestWorker' }
let(:worker_name2) { 'TestWorker2' }
let(:task_id) { '1234' }
let(:task) { described_class.new(job_payload.merge(id: task_id)) }
let(:task) { described_class.new(**job_payload.merge(id: task_id)) }
let(:task_id2) { '2434' }
let(:task2) { described_class.new(job_payload2.merge(id: task_id2)) }
let(:task2) { described_class.new(**job_payload2.merge(id: task_id2)) }

before { described_class.clear }

Expand Down Expand Up @@ -160,7 +160,7 @@
end

describe '.new' do
subject { described_class.new(job_payload.merge(id: id)) }
subject { described_class.new(**job_payload.merge(id: id)) }

let(:id) { '123' }
let(:expected_attrs) do
Expand Down Expand Up @@ -251,11 +251,11 @@
subject { task }

context 'with same id' do
it { is_expected.to eq(described_class.new(job_payload.merge(id: task_id))) }
it { is_expected.to eq(described_class.new(**job_payload.merge(id: task_id))) }
end

context 'with different id' do
it { is_expected.not_to eq(described_class.new(job_payload.merge(id: task_id + 'a'))) }
it { is_expected.not_to eq(described_class.new(**job_payload.merge(id: task_id + 'a'))) }
end

context 'with different object' do
Expand Down
Loading

0 comments on commit 05471e2

Please sign in to comment.