Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(perf): improve delete_from_sorted_set speed #837

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_queue.lua
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
local function delete_from_queue(queue, digest)
local per = 50
local total = redis.call("LLEN", queue)
local index = 0
local per = 50
local total = redis.call("LLEN", queue)
local result = nil

while (index < total) do
local items = redis.call("LRANGE", queue, index, index + per -1)
for index = 0, total, per do
local items = redis.call("LRANGE", queue, index, index + per - 1)
if #items == 0 then
break
end
for _, item in pairs(items) do
for _, item in ipairs(items) do
if string.find(item, digest) then
redis.call("LREM", queue, 1, item)
result = item
break
end
end
index = index + per
if result then break end
end

return result
end
21 changes: 12 additions & 9 deletions lib/sidekiq_unique_jobs/lua/shared/_delete_from_sorted_set.lua
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
local function delete_from_sorted_set(name, digest)
local per = 50
local total = redis.call("zcard", name)
local index = 0
local result
local cursor = "0"
local result = nil
local match_pattern = "*" .. digest .. "*"

while (index < total) do
local items = redis.call("ZRANGE", name, index, index + per -1)
for _, item in pairs(items) do
repeat
local scan_result = redis.call("ZSCAN", name, cursor, "MATCH", match_pattern, "COUNT", 1)
cursor = scan_result[1]
local items = scan_result[2]

for i = 1, #items, 2 do
local item = items[i]
if string.find(item, digest) then
redis.call("ZREM", name, item)
result = item
break
end
end
index = index + per
end
until cursor == "0" or result ~= nil

return result
end
57 changes: 57 additions & 0 deletions spec/performance/unique_job_on_conflict_replace_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace, :perf do
let(:lock_prefix) { SidekiqUniqueJobs.config.lock_prefix }
let(:lock_timeout) { SidekiqUniqueJobs.config.lock_timeout }
let(:lock_ttl) { SidekiqUniqueJobs.config.lock_ttl }
let(:queue) { described_class.sidekiq_options["queue"] }
let(:on_conflict) { described_class.sidekiq_options["on_conflict"] }
let(:lock) { described_class.sidekiq_options["lock"] }

before do
digests.delete_by_pattern("*")
end

context "when schedule queue is large" do
it "locks and replaces quickly" do
(0..100_000).each_slice(1_000) do |nums|
redis do |conn|
conn.pipelined do |pipeline|
nums.each do |num|
created_at = Time.now.to_f
scheduled_at = created_at + rand(3_600..2_592_000)

payload = {
"retry" => true,
"queue" => queue,
"lock" => lock,
"on_conflict" => on_conflict,
"class" => described_class.name,
"args" => [num, { "type" => "extremely unique" }],
"jid" => SecureRandom.hex(12),
"created_at" => created_at,
"lock_timeout" => lock_timeout,
"lock_ttl" => lock_ttl,
"lock_prefix" => lock_prefix,
"lock_args" => [num, { "type" => "extremely unique" }],
"lock_digest" => "#{lock_prefix}:#{SecureRandom.hex}",
}

pipeline.zadd("schedule", scheduled_at, payload.to_json)
end
end
end
end

# queueing it once at the end of the queue should succeed
expect(described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })).not_to be_nil

# queueing it again should be performant
expect do
Timeout.timeout(0.1) do
described_class.perform_in(2_592_000, 100_000, { "type" => "extremely unique" })
end
end.not_to raise_error
end
end
end
14 changes: 14 additions & 0 deletions spec/support/workers/unique_job_on_conflict_replace.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# :nocov:

class UniqueJobOnConflictReplace
include Sidekiq::Worker
sidekiq_options lock: :until_executing,
queue: :customqueue,
on_conflict: :replace

def perform(one, two)
[one, two]
end
end
18 changes: 18 additions & 0 deletions spec/workers/unique_job_on_conflict_replace_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

RSpec.describe UniqueJobOnConflictReplace do
it_behaves_like "sidekiq with options" do
let(:options) do
{
"lock" => :until_executing,
"on_conflict" => :replace,
"queue" => :customqueue,
"retry" => true,
}
end
end

it_behaves_like "a performing worker" do
let(:args) { ["hundred", { "type" => "extremely unique", "id" => 44 }] }
end
end
Loading