From abe7f901ef50e2ee0ef144b2e07594f6ba333488 Mon Sep 17 00:00:00 2001 From: Nick Elser Date: Mon, 4 May 2015 23:06:48 -0700 Subject: [PATCH] initial version --- CHANGELOG.md | 3 + LICENSE.txt | 22 +++ README.md | 54 ++++--- Rakefile | 7 + lib/zhong.rb | 218 ++------------------------ lib/zhong/at.rb | 70 +++++++++ lib/zhong/every.rb | 39 +++++ lib/zhong/job.rb | 122 ++++++++++++++ lib/zhong/scheduler.rb | 76 +++++++++ test/{test_zhong.rb => zhong_test.rb} | 4 - zhong.gemspec | 14 +- 11 files changed, 390 insertions(+), 239 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 LICENSE.txt create mode 100644 lib/zhong/at.rb create mode 100644 lib/zhong/every.rb create mode 100644 lib/zhong/job.rb create mode 100644 lib/zhong/scheduler.rb rename test/{test_zhong.rb => zhong_test.rb} (70%) diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..ec92866 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.1.0 + +- First release. diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..937ae6c --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,22 @@ +Copyright (c) 2015 Nick Elser + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index d8e364f..c3310f4 100644 --- a/README.md +++ b/README.md @@ -1,39 +1,49 @@ # Zhong -Welcome to your new gem! In this directory, you'll find the files you need to be able to package up your Ruby library into a gem. Put your Ruby code in the file `lib/zhong`. To experiment with that code, run `bin/console` for an interactive prompt. +Useful, reliable distributed cron. -TODO: Delete this and the text above, and describe your gem +# Installation -## Installation - -Add this line to your application's Gemfile: +Add this line to your application’s Gemfile: ```ruby gem 'zhong' ``` -And then execute: - - $ bundle - -Or install it yourself as: - - $ gem install zhong - ## Usage -TODO: Write usage instructions here +```ruby +r = Redis.new + +Zhong.schedule(redis: r) do + category "stuff" do + every(5.seconds, "foo") { puts "foo" } + every(1.week, "baz", at: "mon 22:45") { puts "baz" } + end + + category "clutter" do + every(1.second, "compute", if: -> (t) { rand < 0.5 }) { puts "something happened" } + end +end +``` -## Development +## TODO + - better logging + - error handling + - tests + - examples + - callbacks + - generic handler -After checking out the repo, run `bin/setup` to install dependencies. Then, run `bin/console` for an interactive prompt that will allow you to experiment. +## History -To install this gem onto your local machine, run `bundle exec rake install`. To release a new version, update the version number in `version.rb`, and then run `bundle exec rake release` to create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org). +View the [changelog](https://github.com/nickelser/zhong/blob/master/CHANGELOG.md). ## Contributing -1. Fork it ( https://github.com/[my-github-username]/zhong/fork ) -2. Create your feature branch (`git checkout -b my-new-feature`) -3. Commit your changes (`git commit -am 'Add some feature'`) -4. Push to the branch (`git push origin my-new-feature`) -5. Create a new Pull Request +Everyone is encouraged to help improve this project. Here are a few ways you can help: + +- [Report bugs](https://github.com/nickelser/zhong/issues) +- Fix bugs and [submit pull requests](https://github.com/nickelser/zhong/pulls) +- Write, clarify, or fix documentation +- Suggest or add new features diff --git a/Rakefile b/Rakefile index 2995527..5cda719 100644 --- a/Rakefile +++ b/Rakefile @@ -1 +1,8 @@ require "bundler/gem_tasks" +require "rake/testtask" + +task default: :test +Rake::TestTask.new do |t| + t.libs << "test" + t.pattern = "test/**/*_test.rb" +end diff --git a/lib/zhong.rb b/lib/zhong.rb index 24e639f..8f64cc2 100644 --- a/lib/zhong.rb +++ b/lib/zhong.rb @@ -1,218 +1,22 @@ -require "zhong/version" -require "monitor" require "logger" require "redis" require "suo" +require "active_support/time" -module Zhong - class Job - attr_reader :description, :name, :category - - def initialize(manager:, name:, every:, description: nil, category: nil, &block) - @every = every - @description = description - @category = category - @block = block - @redis = manager.config[:redis] - @logger = manager.config[:logger] - @name = name - @lock = Suo::Client::Redis.new(lock_key, client: @redis) - @timeout = 5 - - refresh_last_ran - end - - def run?(time = Time.now) - !@last_ran || next_run_at < time - end - - def run(time = Time.now) - return unless run?(time) - - if running? - @logger.info "already running: #{@name}" - return - end - - ran_set = @lock.lock do - refresh_last_ran - - break unless run?(time) - - if disabled? - @logger.info "disabled: #{@name}" - break - end - - @logger.info "running: #{@name}" - - @thread = Thread.new { @block.call } if @block - - ran!(time) - end - - @logger.info "unable to acquire exclusive run lock: #{@name}" unless ran_set - end - - def stop - return unless running? - Thread.new { @logger.error "killing #{@name} due to stop" } # thread necessary due to trap context - @thread.join(@timeout) - @thread.kill - end - - def running? - @thread && @thread.alive? - end - - def next_run_at - @last_ran ? (@last_ran + @every) : (Time.now - 0.001) - end - - def refresh_last_ran - last_ran_val = @redis.get(run_time_key) - @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil - end - - def disabled? - !!@redis.get(disabled_key) - end - - private - - def ran!(time) - @last_ran = time - @redis.set(run_time_key, @last_ran.to_i) - end - - def run_time_key - "zhong:last_ran:#{@name}" - end - - def disabled_key - "zhong:disabled:#{@name}" - end - - def lock_key - "zhong:lock:#{@name}" - end - end - - class Manager - attr_reader :config, :redis - - def initialize(config = {}) - @jobs = [] - @config = {timeout: 0.5, tz: "UTC"}.merge(config) - @logger = @config[:logger] ||= default_logger - @redis = @config[:redis] ||= Redis.new - end - - def start - %w(QUIT INT TERM).each do |sig| - Signal.trap(sig) { stop } - end - - @logger.info "starting" - - loop do - tick - - break if @stop - end - end - - def stop - Thread.new { @logger.error "stopping" } # thread necessary due to trap context - @stop = true - @jobs.each(&:stop) - Thread.new { @logger.info "stopped" } - end - - def add(job) - @jobs << job - end - - def tick - now = redis_time - - @jobs.each { |job| job.run(now) } - - sleep(interval) - end - - def interval - 1.0 - Time.now.subsec + 0.001 - end +require "zhong/version" - def redis_time - s, ms = @redis.time # returns [seconds since epoch, microseconds] - Time.at(s + ms / (10**6)) - end +require "zhong/at" +require "zhong/every" - def default_logger - Logger.new(STDOUT).tap do |logger| - logger.formatter = -> (_, datetime, _, msg) { "#{datetime}: #{msg}\n" } - end - end - end +require "zhong/job" +require "zhong/scheduler" +module Zhong class << self - def included(klass) - klass.send "include", Methods - klass.extend Methods - end - - def manager - @manager ||= Manager.new - end - - def manager=(manager) - @manager = manager - end - end - - module Methods - def configure(&block) - self.manager.configure(&block) - end - - # def handler(&block) - # self.manager.handler(&block) - # end - - # def error_handler(&block) - # self.manager.error_handler(&block) - # end - - def on(event, options={}, &block) - self.manager.on(event, options, &block) - end - - def every(period, job, options={}, &block) - self.manager.every(period, job, options, &block) - end - - def run - self.manager.run + def schedule(**opts, &block) + @scheduler = Scheduler.new(opts) + @scheduler.instance_eval(&block) + @scheduler.start end end - - extend Methods end - - -r = Redis.new - -x = Zhong::Manager.new(redis: r) - -j = Zhong::Job.new(manager: x, name: "j1", every: 10) { puts "FUCK THIS SHIT YOLOOOOO" } -j2 = Zhong::Job.new(manager: x, name: "j2", every: 15) { puts "FUCK UuuuuuuuUUUUUU" } -j3 = Zhong::Job.new(manager: x, name: "j3", every: 10) { puts "FUCK THIS SHIT !!!!!!!!!!!!!!!!!!!!!!!!!!!!!" } -j4 = Zhong::Job.new(manager: x, name: "j4", every: 5) { sleep 8; puts "RAN FUCK SHIT" } - -x.add(j) -x.add(j2) -x.add(j3) -x.add(j4) -x.start diff --git a/lib/zhong/at.rb b/lib/zhong/at.rb new file mode 100644 index 0000000..43ff4c1 --- /dev/null +++ b/lib/zhong/at.rb @@ -0,0 +1,70 @@ +module Zhong + # Strongly inspired by the Clockwork At class + class At + class FailedToParse < StandardError; end + + WDAYS = %w(sunday monday tuesday wednesday thursday friday saturday).each.with_object({}).with_index do |(w, wdays), index| + [w, w[0...3]].each do |k| + wdays[k] = index + + if k == "tue" + wdays["tues"] = index + elsif k == "thu" + wdays["thr"] = index + end + end + end.freeze + + attr_accessor :minute, :hour, :wday + + def initialize(minute: nil, hour: nil, wday: nil, grace: 0.minutes) + @minute = minute + @hour = hour + @wday = wday + @grace = grace + end + + def next_at(time = Time.now) + at_time = @wday.nil? ? time.dup : (time + (@wday - time.wday).days) + + at_time = at_time.change(hour: @hour, min: @minute) + + if at_time < @grace.ago + if @wday.nil? + at_time += 1.day + else + at_time += 1.week + end + else + at_time + end + end + + def self.parse(at, grace: 0) + return unless at + + case at + when /\A([[:alpha:]]+)\s+(.*)\z/ + wday = WDAYS[$1] + + if wday + parsed_time = parse($2, grace: grace) + parsed_time.wday = wday + parsed_time + else + fail FailedToParse, at + end + when /\A(\d{1,2}):(\d\d)\z/ + new(minute: $2.to_i, hour: $1.to_i, grace: grace) + when /\A\*{1,2}:(\d\d)\z/ + new(minute: $1.to_i, grace: grace) + when /\A(\d{1,2}):\*{1,2}\z/ + new(hour: $1, grace: grace) + else + fail FailedToParse, at + end + rescue ArgumentError + throw FailedToParse, at + end + end +end diff --git a/lib/zhong/every.rb b/lib/zhong/every.rb new file mode 100644 index 0000000..f05ced7 --- /dev/null +++ b/lib/zhong/every.rb @@ -0,0 +1,39 @@ +module Zhong + class Every + class FailedToParse < StandardError; end + + EVERY_KEYWORDS = { + day: 1.day, + week: 1.week, + month: 1.month, + semiannual: 6.months, # enterprise! + year: 1.year, + decade: 10.year + }.freeze + + def initialize(period) + @period = period + end + + def next_at(last = Time.now) + last + @period + end + + def self.parse(every) + return unless every + + case every + when Numeric, ActiveSupport::Duration + new(every) + when String, Symbol + key = every.downcase.to_sym + + fail FailedToParse, every unless EVERY_KEYWORDS.key?(key) + + new(EVERY_KEYWORDS[key]) + else + fail FailedToParse, every + end + end + end +end diff --git a/lib/zhong/job.rb b/lib/zhong/job.rb new file mode 100644 index 0000000..15bb78c --- /dev/null +++ b/lib/zhong/job.rb @@ -0,0 +1,122 @@ +module Zhong + class Job + attr_reader :name, :category + + def initialize(scheduler:, name:, every: nil, at: nil, only_if: nil, category: nil, &block) + @name = name + @category = category + + @at = At.parse(at, grace: scheduler.config[:grace]) + @every = Every.parse(every) + + if @at && !@every + @logger.error "warning: #{self} has `at` but no `every`; could run far more often than expected!" + end + + @block = block + @redis = scheduler.config[:redis] + @logger = scheduler.config[:logger] + @tz = scheduler.config[:tz] + @if = only_if + @lock = Suo::Client::Redis.new(lock_key, client: @redis, stale_lock_expiration: scheduler.config[:long_running_timeout]) + @timeout = 5 + + refresh_last_ran + end + + def run?(time = Time.now) + run_every?(time) && run_at?(time) && run_if?(time) + end + + def run(time = Time.now) + return unless run?(time) + + if running? + @logger.info "already running: #{self}" + return + end + + ran_set = @lock.lock do + refresh_last_ran + + break true unless run?(time) + + if disabled? + @logger.info "disabled: #{self}" + break true + end + + @logger.info "running: #{self}" + + @thread = Thread.new { @block.call } if @block + + ran!(time) + end + + @logger.info "unable to acquire exclusive run lock: #{self}" unless ran_set + end + + def stop + return unless running? + Thread.new { @logger.error "killing #{self} due to stop" } # thread necessary due to trap context + @thread.join(@timeout) + @thread.kill + end + + def running? + @thread && @thread.alive? + end + + def refresh_last_ran + last_ran_val = @redis.get(run_time_key) + @last_ran = last_ran_val ? Time.at(last_ran_val.to_i) : nil + end + + def disable + @redis.set(disabled_key, "true") + end + + def enable + @redis.del(disabled_key) + end + + def disabled? + !!@redis.get(disabled_key) + end + + def to_s + [@category, @name].compact.join(".") + end + + private + + def run_every?(time) + !@last_ran || !@every || @every.next_at(@last_ran) <= time + end + + def run_at?(time) + !@at || @at.next_at(time) <= time + end + + def run_if?(time) + !@if || @if.call(time) + end + + def ran!(time) + @last_ran = time + @redis.set(run_time_key, @last_ran.to_i) + end + + def run_time_key + "zhong:last_ran:#{self}" + end + + def disabled_key + "zhong:disabled:#{self}" + end + + def lock_key + "zhong:lock:#{self}" + end + end +end diff --git a/lib/zhong/scheduler.rb b/lib/zhong/scheduler.rb new file mode 100644 index 0000000..11db394 --- /dev/null +++ b/lib/zhong/scheduler.rb @@ -0,0 +1,76 @@ +module Zhong + class Scheduler + attr_reader :config, :redis, :jobs + + def initialize(config = {}) + @jobs = {} + @config = {timeout: 0.5, grace: 15.minutes, long_running_timeout: 5.minutes}.merge(config) + @logger = @config[:logger] ||= self.class.default_logger + @redis = @config[:redis] ||= Redis.new + end + + def category(name) + @category = name + + yield + + @category = nil + end + + def every(period, name, opts = {}, &block) + add(Job.new(scheduler: self, name: name, every: period, at: opts[:at], only_if: opts[:if], category: @category, &block)) + end + + def start + %w(QUIT INT TERM).each do |sig| + Signal.trap(sig) { stop } + end + + @logger.info "starting at #{redis_time}" + + loop do + now = redis_time + + jobs.each { |_, job| job.run(now) } + + sleep(interval) + + break if @stop + end + end + + def stop + Thread.new { @logger.error "stopping" } # thread necessary due to trap context + @stop = true + jobs.values.each(&:stop) + Thread.new { @logger.info "stopped" } + end + + private + + def add(job) + if @jobs.key?(job.to_s) + @logger.error "duplicate job #{job}, skipping" + return + end + + @jobs[job.to_s] = job + end + + def interval + 1.0 - Time.now.subsec + 0.001 + end + + def redis_time + s, ms = @redis.time # returns [seconds since epoch, microseconds] + now = Time.at(s + ms / (10**6)) + config[:tz] ? now.in_time_zone(config[:tz]) : now + end + + def self.default_logger + Logger.new(STDOUT).tap do |logger| + logger.formatter = -> (_, datetime, _, msg) { "#{datetime}: #{msg}\n" } + end + end + end +end diff --git a/test/test_zhong.rb b/test/zhong_test.rb similarity index 70% rename from test/test_zhong.rb rename to test/zhong_test.rb index 0dcec95..95b8d00 100644 --- a/test/test_zhong.rb +++ b/test/zhong_test.rb @@ -4,8 +4,4 @@ class TestZhong < Minitest::Test def test_that_it_has_a_version_number refute_nil ::Zhong::VERSION end - - def test_it_does_something_useful - assert false - end end diff --git a/zhong.gemspec b/zhong.gemspec index 5eade29..21e59fa 100644 --- a/zhong.gemspec +++ b/zhong.gemspec @@ -1,7 +1,7 @@ # coding: utf-8 -lib = File.expand_path('../lib', __FILE__) +lib = File.expand_path("../lib", __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) -require 'zhong/version' +require "zhong/version" Gem::Specification.new do |spec| spec.name = "zhong" @@ -9,9 +9,9 @@ Gem::Specification.new do |spec| spec.authors = ["Nick Elser"] spec.email = ["nick.elser@gmail.com"] - spec.summary = %q{TODO: Write a short summary, because Rubygems requires one.} - spec.description = %q{TODO: Write a longer description or delete this line.} - spec.homepage = "TODO: Put your gem's website or public repo URL here." + spec.summary = %q{Reliable, distributed cron.} + spec.description = %q{Reliable, distributed cron.} + spec.homepage = "https://www.github.com/nickelser/zhong" spec.license = "MIT" spec.files = `git ls-files -z`.split("\x0") @@ -20,10 +20,12 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.required_ruby_version = "~> 2.0" + spec.required_ruby_version = "~> 2.1" spec.add_dependency "suo" spec.add_dependency "redis" + spec.add_dependency "tzinfo" + spec.add_dependency "activesupport" spec.add_development_dependency "bundler", "~> 1.5" spec.add_development_dependency "rake", "~> 10.0"