Skip to content

Commit

Permalink
Much cleaner redis (& general) config.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickelser committed Oct 19, 2016
1 parent bcebe95 commit a2c3d87
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
9 changes: 4 additions & 5 deletions lib/zhong.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,27 @@ def self.schedule(&block)
end

def self.scheduler
@scheduler ||= Scheduler.new(logger: logger, redis: redis, tz: tz, heartbeat_key: heartbeat_key)
@scheduler ||= Scheduler.new
end

def self.any_running?(grace = 60.seconds)
latest_heartbeat > (redis_time - grace)
latest_heartbeat && latest_heartbeat > (redis_time - grace)
end

def self.latest_heartbeat
all_heartbeats.map { |h| h[:last_seen] }.sort.last
end

def self.all_heartbeats
heartbeat_key = scheduler.config[:heartbeat_key]
heartbeats = Zhong.redis.hgetall(heartbeat_key)
heartbeats = redis.hgetall(heartbeat_key)
now = redis_time

old_beats, new_beats = heartbeats.partition do |_, v|
Time.at(v.to_i) < (now - 15.minutes)
end

redis.multi do
old_beats.each { |b| Zhong.redis.hdel(heartbeat_key, b) }
old_beats.each { |b| redis.hdel(heartbeat_key, b) }
end

new_beats.map do |k, v|
Expand Down
23 changes: 12 additions & 11 deletions lib/zhong/job.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module Zhong
class Job
attr_reader :name, :category, :last_ran, :logger, :at, :every, :id
extend Forwardable
def_delegators Zhong, :redis, :tz, :logger, :heartbeat_key

attr_reader :name, :category, :last_ran, :at, :every, :id

def initialize(job_name, config = {}, callbacks = {}, &block)
@name = job_name
Expand All @@ -16,8 +19,6 @@ def initialize(job_name, config = {}, callbacks = {}, &block)

@block = block

@redis = config[:redis]
@tz = config[:tz]
@if = config[:if]
@long_running_timeout = config[:long_running_timeout]
@running = false
Expand Down Expand Up @@ -89,24 +90,24 @@ def running?
end

def refresh_last_ran
last_ran_val = @redis.get(last_ran_key)
last_ran_val = redis.get(last_ran_key)
@last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil
end

def disable
fire_callbacks(:before_disable, self)
@redis.set(disabled_key, "true")
redis.set(disabled_key, "true")
fire_callbacks(:after_disable, self)
end

def enable
fire_callbacks(:before_enable, self)
@redis.del(disabled_key)
redis.del(disabled_key)
fire_callbacks(:after_enable, self)
end

def disabled?
!@redis.get(disabled_key).nil?
!redis.get(disabled_key).nil?
end

def to_s
Expand All @@ -120,7 +121,7 @@ def next_at
end

def clear
@redis.del(last_ran_key)
redis.del(last_ran_key)
end

def last_ran_key
Expand Down Expand Up @@ -150,7 +151,7 @@ def fire_callbacks(event, *args)
# if the @at value is changed across runs, the last_run becomes invalid
# so clear it
def clear_last_ran_if_at_changed
previous_at_msgpack = @redis.get(desired_at_key)
previous_at_msgpack = redis.get(desired_at_key)

if previous_at_msgpack
previous_at = At.deserialize(previous_at_msgpack)
Expand All @@ -161,7 +162,7 @@ def clear_last_ran_if_at_changed
end
end

@redis.set(desired_at_key, @at.serialize)
redis.set(desired_at_key, @at.serialize)
end

def run_every?(time)
Expand All @@ -178,7 +179,7 @@ def run_if?(time)

def ran!(time)
@last_ran = time
@redis.set(last_ran_key, @last_ran.to_i)
redis.set(last_ran_key, @last_ran.to_i)
end

def redis_lock
Expand Down
22 changes: 12 additions & 10 deletions lib/zhong/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
module Zhong
class Scheduler
attr_reader :config, :redis, :jobs, :logger
extend Forwardable

def_delegators Zhong, :redis, :tz, :logger, :heartbeat_key
attr_reader :jobs

DEFAULT_CONFIG = {
timeout: 0.5,
grace: 15.minutes,
long_running_timeout: 5.minutes,
tz: nil
long_running_timeout: 5.minutes
}.freeze

def initialize(config = {})
@jobs = {}
@callbacks = {}
@config = DEFAULT_CONFIG.merge(config)

@logger = @config[:logger]
@redis = @config[:redis]
@tz = @config[:tz]
@category = nil
@error_handler = nil
@running = false
Expand Down Expand Up @@ -56,7 +55,10 @@ def error_handler(&block)
end

def on(event, &block)
raise "unknown callback #{event}" unless [:before_tick, :after_tick, :before_run, :after_run, :before_disable, :after_disable, :before_enable, :after_enable].include?(event.to_sym)
unless [:before_tick, :after_tick, :before_run, :after_run, :before_disable,
:after_disable, :before_enable, :after_enable].include?(event.to_sym)
raise "unknown callback #{event}"
end
(@callbacks[event.to_sym] ||= []) << block
end

Expand Down Expand Up @@ -111,9 +113,9 @@ def find_by_name(job_name)
end

def redis_time
s, ms = @redis.time # returns [seconds since epoch, microseconds]
s, ms = redis.time # returns [seconds since epoch, microseconds]
now = Time.at(s + ms / (10**6))
@tz ? now.in_time_zone(@tz) : now
tz ? now.in_time_zone(tz) : now
end

private
Expand Down Expand Up @@ -143,7 +145,7 @@ def run_job(job, time = redis_time)
end

def heartbeat(time)
@redis.hset(config[:heartbeat_key], heartbeat_field, time.to_i)
redis.hset(heartbeat_key, heartbeat_field, time.to_i)
end

def heartbeat_field
Expand Down
20 changes: 20 additions & 0 deletions test/test_library.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,27 @@ def test_heartbeats
assert_equal true, Zhong.any_running?
assert_in_delta Zhong.redis_time.to_f, Time.now.to_f, 1
assert_in_delta Zhong.redis_time.to_f, Zhong.latest_heartbeat.to_f, 1
refute_empty Zhong.all_heartbeats
Zhong.stop
t.join
end

def test_redis_change
Zhong.schedule { nil }
t = Thread.new { Zhong.start }
sleep(1)
assert_equal true, Zhong.any_running?
test_redis = Zhong.redis
Zhong.stop
t.join
Zhong.redis = Redis.new(url: "redis://localhost/15")
refute Zhong.any_running?(5.seconds)
t = Thread.new { Zhong.start }
sleep(1)
assert_equal true, Zhong.any_running?
assert_in_delta Zhong.redis_time.to_f, Time.now.to_f, 1
Zhong.stop
Zhong.redis = test_redis
t.join
end
end

0 comments on commit a2c3d87

Please sign in to comment.