Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(digest): write digest on middleware call #774

Merged
merged 8 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ jobs:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.1
bundler: 2.4.12
ruby-version: 3.2
bundler-cache: true
- run: bin/bundle --jobs=$(nproc) --retry=$(nproc)
- run: bin/rubocop -P
6 changes: 3 additions & 3 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ jobs:
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.2
bundler: 2.4.12
bundler-cache: true

- name: Install Code Climate reporter
Expand Down Expand Up @@ -59,16 +58,17 @@ jobs:
strategy:
fail-fast: true
matrix:
ruby: [2.7, '3.0', 3.1, 3.2]
ruby: ["2.7", '3.0', "3.1", "3.2", "3.3"]
gemfile:
- sidekiq_7.0
- sidekiq_7.1
- sidekiq_7.2

steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
bundler: 2.4.12
bundler-cache: true
- run: >-
REDIS_HOST=localhost
Expand Down
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Layout/EndAlignment:
Layout/LineContinuationLeadingSpace:
Enabled: false

Layout/MultilineMethodCallIndentation:
EnforcedStyle: indented

Lint/AmbiguousBlockAssociation:
Exclude:
- spec/**/*
Expand Down
8 changes: 8 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@
appraise "sidekiq-7.0" do
gem "sidekiq", "~> 7.0.0"
end

appraise "sidekiq-7.1" do
gem "sidekiq", "~> 7.1.0"
end

appraise "sidekiq-7.2" do
gem "sidekiq", "~> 7.2.0"
end
28 changes: 28 additions & 0 deletions gemfiles/sidekiq_7.1.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file was generated by Appraisal

source "https://rubygems.org"

gem "appraisal"
gem "faraday-retry"
gem "gem-release"
gem "github-markup"
gem "rack-test"
gem "rake", "13.0.3"
gem "reek", ">= 5.3"
gem "rspec"
gem "rspec-benchmark"
gem "rspec-html-matchers"
gem "rspec-its"
gem "rubocop-mhenrixon"
gem "simplecov-sublime", ">= 0.21.2", require: false
gem "sinatra"
gem "timecop"
gem "toxiproxy"
gem "yard"
gem "sidekiq", "~> 7.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
end

gemspec path: "../"
28 changes: 28 additions & 0 deletions gemfiles/sidekiq_7.2.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file was generated by Appraisal

source "https://rubygems.org"

gem "appraisal"
gem "faraday-retry"
gem "gem-release"
gem "github-markup"
gem "rack-test"
gem "rake", "13.0.3"
gem "reek", ">= 5.3"
gem "rspec"
gem "rspec-benchmark"
gem "rspec-html-matchers"
gem "rspec-its"
gem "rubocop-mhenrixon"
gem "simplecov-sublime", ">= 0.21.2", require: false
gem "sinatra"
gem "timecop"
gem "toxiproxy"
gem "yard"
gem "sidekiq", "~> 7.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
end

gemspec path: "../"
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class Config < ThreadSafeConfig
# @return [Hash<Symbol, SidekiqUniqueJobs::Lock::BaseLock] all available default locks
LOCKS =
LOCKS_WHEN_BUSY.dup
.merge(LOCKS_WHILE_ENQUEUED.dup)
.merge(LOCKS_WITHOUT_UNLOCK.dup)
.merge(LOCKS_FROM_PUSH_TO_PROCESSED.dup)
.freeze
.merge(LOCKS_WHILE_ENQUEUED.dup)
.merge(LOCKS_WITHOUT_UNLOCK.dup)
.merge(LOCKS_FROM_PUSH_TO_PROCESSED.dup)
.freeze

#
# @return [Hash<Symbol, SidekiqUniqueJobs::OnConflict::Strategy] all available default strategies
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ def add_lock_ttl(item)
end

def add_lock_timeout(item)
item[LOCK_TIMEOUT] ||= SidekiqUniqueJobs::LockTimeout.calculate(item)
item[LOCK_TIMEOUT] = SidekiqUniqueJobs::LockTimeout.calculate(item)
end

def add_lock_args(item)
item[LOCK_ARGS] ||= SidekiqUniqueJobs::LockArgs.call(item)
item[LOCK_ARGS] = SidekiqUniqueJobs::LockArgs.call(item)
end

def add_lock_digest(item)
item[LOCK_DIGEST] ||= SidekiqUniqueJobs::LockDigest.call(item)
item[LOCK_DIGEST] = SidekiqUniqueJobs::LockDigest.call(item)
end

def add_lock_prefix(item)
item[LOCK_PREFIX] ||= SidekiqUniqueJobs.config.lock_prefix
item[LOCK_PREFIX] = SidekiqUniqueJobs.config.lock_prefix
end

def add_lock_type(item)
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ def primed_async(conn, wait = nil, &block) # rubocop:disable Metrics/MethodLengt

# NOTE: When debugging, change .value to .value!
primed_jid = Concurrent::Promises
.future(conn) { |red_con| pop_queued(red_con, timeout) }
.value
.future(conn) { |red_con| pop_queued(red_con, timeout) }
.value

handle_primed(primed_jid, &block)
end
Expand Down
25 changes: 16 additions & 9 deletions lib/sidekiq_unique_jobs/lua/unlock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,28 @@ if lock_type ~= "until_expired" then
redis.call("HDEL", locked, job_id)
end

if redis.call("LLEN", primed) == 0 then
log_debug("UNLINK", primed)
redis.call("UNLINK", primed)
end

local locked_count = redis.call("HLEN", locked)

if locked_count and locked_count < 1 then
if locked_count < 1 then
log_debug("UNLINK", locked)
redis.call("UNLINK", locked)
end

if redis.call("LLEN", primed) == 0 then
log_debug("UNLINK", primed)
redis.call("UNLINK", primed)
end

if limit and limit <= 1 and locked_count and locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
if limit then
if limit <= 1 and locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
end
else
if locked_count <= 1 then
log_debug("ZREM", digests, digest)
redis.call("ZREM", digests, digest)
end
end

log_debug("LPUSH", queued, "1")
Expand Down
10 changes: 5 additions & 5 deletions lib/sidekiq_unique_jobs/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ module Middleware
# This method runs before (prepended) the actual middleware implementation.
# This is done to reduce duplication
#
# @param [Sidekiq::Worker] worker_class
# @param [Sidekiq::Job] worker_class
# @param [Hash] item a sidekiq job hash
# @param [String] queue name of the queue
# @param [ConnectionPool] redis_pool only used for compatility reasons
#
# @return [yield<super>] <description>
# @return [yield<super>] call the rest of the middleware stack
#
# @yieldparam [<type>] if <description>
# @yieldreturn [<type>] <describe what yield should return>
# @yieldparam [void] if uniquejobs is disable
# @yieldreturn [void] delegate back to other sidekiq middleware
def call(worker_class, item, queue, redis_pool = nil)
@item = item
@queue = queue
@redis_pool = redis_pool
self.job_class = worker_class
return yield if unique_disabled?

SidekiqUniqueJobs::Job.prepare(item) unless item[LOCK_DIGEST]
SidekiqUniqueJobs::Job.prepare(item)

with_logging_context do
super
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Middleware
#
# @author Mikael Henriksson <[email protected]>
class Client
include Sidekiq::ClientMiddleware if defined?(Sidekiq::ClientMiddleware)
include Sidekiq::ClientMiddleware

# prepend "SidekiqUniqueJobs::Middleware"
# @!parse prepends SidekiqUniqueJobs::Middleware
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Middleware
#
# @author Mikael Henriksson <[email protected]>
class Server
include Sidekiq::ServerMiddleware if defined?(Sidekiq::ServerMiddleware)
include Sidekiq::ServerMiddleware

# prepend "SidekiqUniqueJobs::Middleware"
# @!parse prepends SidekiqUniqueJobs::Middleware
Expand Down
48 changes: 13 additions & 35 deletions lib/sidekiq_unique_jobs/sidekiq_unique_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,46 +68,24 @@ def delete(score, job_id)
prepend UniqueExtension
end

if Sidekiq.const_defined?(:JobRecord)
# See Sidekiq::Api
class JobRecord
#
# Provides extensions for unlocking jobs that are removed and deleted
# See Sidekiq::Api
class JobRecord
#
# Provides extensions for unlocking jobs that are removed and deleted
#
# @author Mikael Henriksson <[email protected]>
#
module UniqueExtension
#
# @author Mikael Henriksson <[email protected]>
# Wraps the original method to ensure locks for the job are deleted
#
module UniqueExtension
#
# Wraps the original method to ensure locks for the job are deleted
#
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end

prepend UniqueExtension
end
else
# See Sidekiq::Api
class Job
#
# Provides extensions for unlocking jobs that are removed and deleted
#
# @author Mikael Henriksson <[email protected]>
#
module UniqueExtension
#
# Wraps the original method to ensure locks for the job are deleted
#
def delete
SidekiqUniqueJobs::Unlockable.delete!(item)
super
end
end

prepend UniqueExtension
end
prepend UniqueExtension
end

# See Sidekiq::Api
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ def validate_worker!(options)
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker]
# @return [Sidekiq::Job]
def constantize(str)
return str.class if str.is_a?(Sidekiq::Worker) # sidekiq v6.x
return str.class if str.is_a?(Sidekiq::Job) # sidekiq v6.x
return str unless str.is_a?(String)
return Object.const_get(str) unless str.include?("::")

Expand All @@ -269,7 +269,7 @@ def constantize(str)
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker, String]
# @return [Sidekiq::Job, String]
def safe_constantize(str)
constantize(str)
rescue NameError => ex
Expand Down
6 changes: 3 additions & 3 deletions lib/sidekiq_unique_jobs/sidekiq_worker_methods.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Module with convenience methods for the Sidekiq::Worker class
# Module with convenience methods for the Sidekiq::Job class
#
# @author Mikael Henriksson <[email protected]>
module SidekiqWorkerMethods
#
# @!attribute [r] job_class
# @return [Sidekiq::Worker] The Sidekiq::Worker implementation
# @return [Sidekiq::Job] The Sidekiq::Job implementation
attr_reader :job_class

# Avoids duplicating worker_class.respond_to? in multiple places
Expand Down Expand Up @@ -62,7 +62,7 @@ def after_unlock_hook # rubocop:disable Metrics/MethodLength
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument when the constant can't be found
#
# @return [Sidekiq::Worker]
# @return [Sidekiq::Job]
def job_class_constantize(klazz = @job_class)
SidekiqUniqueJobs.safe_constantize(klazz)
end
Expand Down
8 changes: 4 additions & 4 deletions lib/sidekiq_unique_jobs/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def self.use_options(tmp_config = {}) # rubocop:disable Metrics/MethodLength
end

#
# See Sidekiq::Worker in Sidekiq gem for more details
# See Sidekiq::Job in Sidekiq gem for more details
#
module Worker
#
# Adds class methods to Sidekiq::Worker
# Adds class methods to Sidekiq::Job
#
module ClassMethods
#
Expand Down Expand Up @@ -110,14 +110,14 @@ def clear
prepend Overrides

#
# Prepends methods to Sidekiq::Worker
# Prepends methods to Sidekiq::Job
#
module ClassMethods
prepend Overrides::ClassMethods
end

#
# Prepends singleton methods to Sidekiq::Worker
# Prepends singleton methods to Sidekiq::Job
#
module SignletonOverrides
#
Expand Down
Loading