Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add digest scores for faster deletes in sorted sets #835

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def delete
# Deletes the lock regardless of if it has a pttl set
#
def delete!
call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).to_i.positive?
call_script(:delete, key.to_a, argv).to_i.positive?
end

#
Expand Down Expand Up @@ -362,7 +362,11 @@ def taken?(conn)
end

def argv
[job_id, config.pttl, config.type, config.limit]
[job_id, config.pttl, config.type, config.limit, lock_score]
end

def lock_score
item[AT].to_s
end

def lock_info
Expand All @@ -375,6 +379,7 @@ def lock_info
TYPE => config.type,
LOCK_ARGS => item[LOCK_ARGS],
TIME => now_f,
AT => item[AT],
)
end

Expand Down
11 changes: 6 additions & 5 deletions lib/sidekiq_unique_jobs/lua/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------

-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = tostring(ARGV[9])
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = tostring(ARGV[10])
--------- END injected arguments ---------

-------- BEGIN local functions --------
Expand Down
23 changes: 16 additions & 7 deletions lib/sidekiq_unique_jobs/lua/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = ARGV[9]
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = ARGV[10]
--------- END injected arguments ---------


Expand Down Expand Up @@ -62,8 +63,16 @@ if lock_type == "until_expired" and pttl and pttl > 0 then
log_debug("ZADD", expiring_digests, current_time + pttl, digest)
redis.call("ZADD", expiring_digests, current_time + pttl, digest)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this alone because I don't fully understand what the expiring digests set is used for and if this optimization would be applicable to jobs utilizing an until_expired lock strategy.

else
log_debug("ZADD", digests, current_time, digest)
redis.call("ZADD", digests, current_time, digest)
local score

if #lock_score == 0 then
score = current_time
else
score = lock_score
end

log_debug("ZADD", digests, score, digest)
redis.call("ZADD", digests, score, digest)
end

log_debug("HSET", locked, job_id, current_time)
Expand Down
17 changes: 9 additions & 8 deletions lib/sidekiq_unique_jobs/lua/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ local digests = KEYS[7]


-------- BEGIN lock arguments ---------
local job_id = ARGV[1] -- The job_id that was previously primed
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local job_id = ARGV[1] -- The job_id that was previously primed
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
--------- END injected arguments ---------


Expand Down
20 changes: 10 additions & 10 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
local function delete_from_queue(queue, digest)
local per = 50
local total = redis.call("LLEN", queue)
local index = 0
local result = nil
local total = redis.call("LLEN", queue)
local per = 50

for index = 0, total, per do
local items = redis.call("LRANGE", queue, index, index + per - 1)

while (index < total) do
local items = redis.call("LRANGE", queue, index, index + per -1)
if #items == 0 then
break
end

for _, item in pairs(items) do
if string.find(item, digest) then
redis.call("LREM", queue, 1, item)
result = item
break

return item
end
end
index = index + per
end
return result

return nil
end
30 changes: 20 additions & 10 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
local function delete_from_sorted_set(name, digest)
local per = 50
local total = redis.call("zcard", name)
local index = 0
local result
local score = redis.call("ZSCORE", "uniquejobs:digests", digest)
local total = redis.call("ZCARD", name)
local per = 50

for offset = 0, total, per do
local items

if score then
items = redis.call("ZRANGE", name, score, "+inf", "BYSCORE", "LIMIT", offset, per)
else
items = redis.call("ZRANGE", name, offset, offset + per -1)
end

if #items == 0 then
break
end

while (index < total) do
local items = redis.call("ZRANGE", name, index, index + per -1)
for _, item in pairs(items) do
if string.find(item, digest) then
redis.call("ZREM", name, item)
result = item
break

return item
ezekg marked this conversation as resolved.
Show resolved Hide resolved
end
end
index = index + per
end
return result

return nil
end
19 changes: 10 additions & 9 deletions lib/sidekiq_unique_jobs/lua/unlock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ local digests = KEYS[7]


-------- BEGIN lock arguments ---------
local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local job_id = ARGV[1]
local pttl = tonumber(ARGV[2])
local lock_type = ARGV[3]
local limit = tonumber(ARGV[4])
local lock_score = ARGV[5]
-------- END lock arguments -----------


-------- BEGIN injected arguments --------
local current_time = tonumber(ARGV[5])
local debug_lua = tostring(ARGV[6]) == "1"
local max_history = tonumber(ARGV[7])
local script_name = tostring(ARGV[8]) .. ".lua"
local redisversion = ARGV[9]
local current_time = tonumber(ARGV[6])
local debug_lua = tostring(ARGV[7]) == "1"
local max_history = tonumber(ARGV[8])
local script_name = tostring(ARGV[9]) .. ".lua"
local redisversion = ARGV[10]
--------- END injected arguments ---------


Expand Down
55 changes: 55 additions & 0 deletions spec/performance/unique_job_on_conflict_replace_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace, :perf do
let(:lock_prefix) { described_class.sidekiq_options.fetch("lock_prefix") { SidekiqUniqueJobs.config.lock_prefix } }
let(:lock_timeout) { described_class.sidekiq_options.fetch("lock_timeout") { SidekiqUniqueJobs.config.lock_timeout } }
let(:lock_ttl) { described_class.sidekiq_options.fetch("lock_ttl") { SidekiqUniqueJobs.config.lock_ttl } }
let(:queue) { described_class.sidekiq_options["queue"] }
let(:on_conflict) { described_class.sidekiq_options["on_conflict"] }
let(:lock) { described_class.sidekiq_options["lock"] }

before do
flushdb
end

context "when schedule queue is large" do
it "locks and replaces quickly" do
(0..100_000).each_slice(1_000) do |nums|
redis do |conn|
conn.pipelined do |pipeline|
nums.each do |num|
created_at = Time.now.to_f
scheduled_at = created_at + rand(3_600..2_592_000)

payload = {
"retry" => true,
"queue" => queue,
"lock" => lock,
"on_conflict" => on_conflict,
"class" => described_class.name,
"args" => [num, { "type" => "extremely unique" }],
"jid" => SecureRandom.hex(12),
"created_at" => created_at,
"lock_timeout" => lock_timeout,
"lock_ttl" => lock_ttl,
"lock_prefix" => lock_prefix,
"lock_args" => [num, { "type" => "extremely unique" }],
"lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}",
}

pipeline.zadd("schedule", scheduled_at, payload.to_json)
end
end
end
end

# queueing it once at the end of the queue should succeed
expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil

# queueing it again should be performant
expect do
described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })
end.to perform_under(10).ms
end
end
end
3 changes: 3 additions & 0 deletions spec/sidekiq_unique_jobs/lua/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
lock_ttl,
lock_type,
lock_limit,
lock_score,
]
end
let(:job_id) { "jobid" }
Expand All @@ -22,6 +23,8 @@
let(:lock_ttl) { nil }
let(:locked_jid) { job_id }
let(:lock_limit) { 1 }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_score) { now_f.to_s }

context "when queued" do
before do
Expand Down
5 changes: 3 additions & 2 deletions spec/sidekiq_unique_jobs/lua/lock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
RSpec.describe "lock.lua" do
subject(:lock) { call_script(:lock, key.to_a, argv_one) }

let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] }
let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] }
let(:job_id_one) { "job_id_one" }
let(:job_id_two) { "job_id_two" }
let(:lock_type) { :until_executed }
Expand All @@ -19,6 +19,7 @@
let(:locked_jid) { job_id_one }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_limit) { 1 }
let(:lock_score) { now_f.to_s }

shared_context "with a primed key", :with_primed_key do
before do
Expand Down
10 changes: 6 additions & 4 deletions spec/sidekiq_unique_jobs/lua/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
lock_pttl,
lock_type,
lock_limit,
lock_score,
]
end
let(:digest) { "uniquejobs:digest" }
Expand All @@ -21,6 +22,7 @@
let(:locked_jid) { job_id }
let(:lock_limit) { 1 }
let(:now_f) { SidekiqUniqueJobs.now_f }
let(:lock_score) { now_f.to_s }

before do
flush_redis
Expand Down Expand Up @@ -54,7 +56,7 @@

context "when queued by another job_id" do
before do
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
end

context "with lock_limit 1" do
Expand Down Expand Up @@ -94,7 +96,7 @@

context "when queued by same job_id" do
before do
call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_one, lock_pttl, lock_type, lock_limit, lock_score])
end

it "stores the right keys in redis" do
Expand All @@ -113,9 +115,9 @@

context "when primed by another job_id" do
before do
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:queue, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
rpoplpush(key.queued, key.primed)
call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit])
call_script(:lock, key.to_a, [job_id_two, lock_pttl, lock_type, lock_limit, lock_score])
end

context "with lock_limit 1" do
Expand Down
5 changes: 3 additions & 2 deletions spec/sidekiq_unique_jobs/lua/unlock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
RSpec.describe "unlock.lua" do
subject(:unlock) { call_script(:unlock, key.to_a, argv_one) }

let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit] }
let(:argv_one) { [job_id_one, lock_ttl, lock_type, lock_limit, lock_score] }
let(:argv_two) { [job_id_two, lock_ttl, lock_type, lock_limit, lock_score] }
let(:job_id_one) { "job_id_one" }
let(:job_id_two) { "job_id_two" }
let(:lock_type) { :until_executed }
Expand All @@ -17,6 +17,7 @@
let(:lock_ttl) { nil }
let(:locked_jid) { job_id_one }
let(:lock_limit) { 1 }
let(:lock_score) { now_f.to_s }

shared_context "with a lock", :with_a_lock do
before do
Expand Down
8 changes: 4 additions & 4 deletions spec/support/sidekiq_unique_jobs/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ def debug(*args)
redis { |conn| conn.debug(*args) }
end

def flushall(options = nil)
redis { |conn| conn.flushall(options) }
def flushall(...)
redis { |conn| conn.flushall(...) }
end

def flushdb(options = nil)
redis { |conn| conn.flushdb(options) }
def flushdb(...)
ezekg marked this conversation as resolved.
Show resolved Hide resolved
redis { |conn| conn.flushdb(...) }
end

def info(_cmd = nil)
Expand Down
Loading