Skip to content

Commit

Permalink
fix(perf): improve delete_from_sorted_set speed
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon committed Feb 17, 2024
1 parent 779f297 commit 3341abe
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 9 deletions.
21 changes: 12 additions & 9 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
local function delete_from_sorted_set(name, digest)
local per = 50
local total = redis.call("zcard", name)
local index = 0
local result
local cursor = "0"
local result = nil
local match_pattern = "*" .. digest .. "*"

while (index < total) do
local items = redis.call("ZRANGE", name, index, index + per -1)
for _, item in pairs(items) do
repeat
local scan_result = redis.call("ZSCAN", name, cursor, "MATCH", match_pattern, "COUNT", 1)
cursor = scan_result[1]
local items = scan_result[2]

for i = 1, #items, 2 do
local item = items[i]
if string.find(item, digest) then
redis.call("ZREM", name, item)
result = item
break
end
end
index = index + per
end
until cursor == "0" or result ~= nil

return result
end
57 changes: 57 additions & 0 deletions spec/performance/unique_job_on_conflict_replace_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace, :perf do
let(:lock_prefix) { SidekiqUniqueJobs.config.lock_prefix }
let(:lock_timeout) { SidekiqUniqueJobs.config.lock_timeout }
let(:lock_ttl) { SidekiqUniqueJobs.config.lock_ttl }
let(:queue) { described_class.sidekiq_options["queue"] }
let(:on_conflict) { described_class.sidekiq_options["on_conflict"] }
let(:lock) { described_class.sidekiq_options["lock"] }

before do
digests.delete_by_pattern("*")
end

context "when schedule queue is large" do
it "locks and replaces quickly" do
(0..100_000).each_slice(1_000) do |nums|
redis do |conn|
conn.pipelined do |pipeline|
nums.each do |num|
created_at = Time.now.to_f
scheduled_at = created_at + rand(3_600..2_592_000)

payload = {
"retry" => true,
"queue" => queue,
"lock" => lock,
"on_conflict" => on_conflict,
"class" => described_class.name,
"args" => [num, { "type" => "extremely unique" }],
"jid" => SecureRandom.hex(12),
"created_at" => created_at,
"lock_timeout" => lock_timeout,
"lock_ttl" => lock_ttl,
"lock_prefix" => lock_prefix,
"lock_args" => [num, { "type" => "extremely unique" }],
"lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}"
}

pipeline.zadd("schedule", scheduled_at, payload.to_json)
end
end
end
end

# queueing it once at the end of the queue should succeed
expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil

# queueing it again should be performant
expect do
Timeout.timeout(0.1) do
described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })
end
end.not_to raise_error
end
end
end
14 changes: 14 additions & 0 deletions spec/support/workers/unique_job_on_conflict_replace.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# :nocov:

class UniqueJobOnConflictReplace
include Sidekiq::Worker
sidekiq_options lock: :until_executing,
queue: :customqueue,
on_conflict: :replace

def perform(one, two)
[one, two]
end
end
18 changes: 18 additions & 0 deletions spec/workers/unique_job_on_conflict_replace_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace do
it_behaves_like "sidekiq with options" do
let(:options) do
{
"lock" => :until_executing,
"on_conflict" => :replace,
"queue" => :customqueue,
"retry" => true,
}
end
end

it_behaves_like "a performing worker" do
let(:args) { ["hundred", { "type" => "extremely unique", "id" => 44 }] }
end
end

0 comments on commit 3341abe

Please sign in to comment.