Merge pull request #36 from preisanalytics/jhuebl_add_data_to_scheduled_job_20160113

Add the possibility to schedule jobs with arguments
This commit is contained in:
Dzmitry Plashchynski
2016-01-16 01:18:22 +02:00
7 changed files with 55 additions and 17 deletions

View File

@@ -61,11 +61,11 @@ class TestJob < ActiveJob::Base
end end
``` ```
The ActiveJob jobs is convenient because you can use one job in both periodic and enqueued ways. But Active Job is not required. Any class can be used as a crono job if it implements a method `perform` without arguments: The ActiveJob jobs is convenient because you can use one job in both periodic and enqueued ways. But Active Job is not required. Any class can be used as a crono job if it implements a method `perform`:
```ruby ```ruby
class TestJob # This is not an Active Job job, but pretty legal Crono job. class TestJob # This is not an Active Job job, but pretty legal Crono job.
def perform def perform(*args)
# put you scheduled code here # put you scheduled code here
# Comments.deleted.clean_up... # Comments.deleted.clean_up...
end end
@@ -124,6 +124,13 @@ The `at` can be a Hash:
Crono.perform(TestJob).every 1.day, at: {hour: 12, min: 15} Crono.perform(TestJob).every 1.day, at: {hour: 12, min: 15}
``` ```
You can schedule a job with arguments, which can contain objects that can be
serialized using JSON.generate
```ruby
Crono.perform(TestJob, 'some', 'args').every 1.day, at: {hour: 12, min: 15}
```
#### Run daemon #### Run daemon
To run Crono daemon, in your Rails project root directory: To run Crono daemon, in your Rails project root directory:

View File

@@ -6,12 +6,13 @@ module Crono
class Job class Job
include Logging include Logging
attr_accessor :performer, :period, :last_performed_at, attr_accessor :performer, :period, :job_args, :last_performed_at,
:next_performed_at, :job_log, :job_logger, :healthy, :execution_interval :next_performed_at, :job_log, :job_logger, :healthy, :execution_interval
def initialize(performer, period) def initialize(performer, period, job_args)
self.execution_interval = 0.minutes self.execution_interval = 0.minutes
self.performer, self.period = performer, period self.performer, self.period = performer, period
self.job_args = JSON.generate(job_args)
self.job_log = StringIO.new self.job_log = StringIO.new
self.job_logger = Logger.new(job_log) self.job_logger = Logger.new(job_log)
self.next_performed_at = period.next self.next_performed_at = period.next
@@ -64,11 +65,11 @@ module Crono
saved_log = model.reload.log || '' saved_log = model.reload.log || ''
log_to_save = saved_log + job_log.string log_to_save = saved_log + job_log.string
model.update(last_performed_at: last_performed_at, log: log_to_save, model.update(last_performed_at: last_performed_at, log: log_to_save,
healthy: healthy) healthy: healthy, args: job_args)
end end
def perform_job def perform_job
performer.new.perform performer.new.perform *JSON.parse(job_args)
rescue StandardError => e rescue StandardError => e
handle_job_fail(e) handle_job_fail(e)
else else

View File

@@ -1,13 +1,14 @@
module Crono module Crono
# Crono::PerformerProxy is a proxy used in cronotab.rb semantic # Crono::PerformerProxy is a proxy used in cronotab.rb semantic
class PerformerProxy class PerformerProxy
def initialize(performer, scheduler) def initialize(performer, scheduler, job_args)
@performer = performer @performer = performer
@scheduler = scheduler @scheduler = scheduler
@job_args = job_args
end end
def every(period, *args) def every(period, *args)
@job = Job.new(@performer, Period.new(period, *args)) @job = Job.new(@performer, Period.new(period, *args), @job_args)
@scheduler.add_job(@job) @scheduler.add_job(@job)
self self
end end
@@ -18,7 +19,7 @@ module Crono
end end
end end
def self.perform(performer) def self.perform(performer, *job_args)
PerformerProxy.new(performer, Crono.scheduler) PerformerProxy.new(performer, Crono.scheduler, job_args)
end end
end end

View File

@@ -5,6 +5,7 @@ class CreateCronoJobs < ActiveRecord::Migration
t.text :log t.text :log
t.datetime :last_performed_at t.datetime :last_performed_at
t.boolean :healthy t.boolean :healthy
t.text :args
t.timestamps null: false t.timestamps null: false
end end
add_index :crono_jobs, [:job_id], unique: true add_index :crono_jobs, [:job_id], unique: true

View File

@@ -2,14 +2,20 @@ require 'spec_helper'
describe Crono::Job do describe Crono::Job do
let(:period) { Crono::Period.new(2.day, at: '15:00') } let(:period) { Crono::Period.new(2.day, at: '15:00') }
let(:job) { Crono::Job.new(TestJob, period) } let(:job_args) {[{some: 'data'}]}
let(:failing_job) { Crono::Job.new(TestFailingJob, period) } let(:job) { Crono::Job.new(TestJob, period, []) }
let(:job_with_args) { Crono::Job.new(TestJob, period, job_args) }
let(:failing_job) { Crono::Job.new(TestFailingJob, period, []) }
it 'should contain performer and period' do it 'should contain performer and period' do
expect(job.performer).to be TestJob expect(job.performer).to be TestJob
expect(job.period).to be period expect(job.period).to be period
end end
it 'should contain data as JSON String' do
expect(job_with_args.job_args).to eq '[{"some":"data"}]'
end
describe '#next' do describe '#next' do
it 'should return next performing time according to period' do it 'should return next performing time according to period' do
expect(job.next).to be_eql period.next expect(job.next).to be_eql period.next
@@ -56,6 +62,20 @@ describe Crono::Job do
test_preform_job_twice test_preform_job_twice
end end
it 'should call perform of performer' do
expect(TestJob).to receive(:new).with(no_args)
thread = job.perform.join
expect(thread).to be_stop
end
it 'should call perform of performer with data' do
test_job = double()
expect(TestJob).to receive(:new).and_return(test_job)
expect(test_job).to receive(:perform).with({'some' => 'data'})
thread = job_with_args.perform.join
expect(thread).to be_stop
end
def test_preform_job_twice def test_preform_job_twice
expect(job).to receive(:perform_job).twice expect(job).to receive(:perform_job).twice
job.perform.join job.perform.join
@@ -80,10 +100,12 @@ describe Crono::Job do
it 'should update saved job' do it 'should update saved job' do
job.last_performed_at = Time.now job.last_performed_at = Time.now
job.healthy = true job.healthy = true
job.job_args = JSON.generate([{some: 'data'}])
job.save job.save
@crono_job = Crono::CronoJob.find_by(job_id: job.job_id) @crono_job = Crono::CronoJob.find_by(job_id: job.job_id)
expect(@crono_job.last_performed_at.utc.to_s).to be_eql job.last_performed_at.utc.to_s expect(@crono_job.last_performed_at.utc.to_s).to be_eql job.last_performed_at.utc.to_s
expect(@crono_job.healthy).to be true expect(@crono_job.healthy).to be true
expect(@crono_job.args).to eq '[{"some":"data"}]'
end end
it 'should save and truncate job log' do it 'should save and truncate job log' do
@@ -102,7 +124,7 @@ describe Crono::Job do
end end
it 'should load last_performed_at from DB' do it 'should load last_performed_at from DB' do
@job = Crono::Job.new(TestJob, period) @job = Crono::Job.new(TestJob, period, job_args)
@job.load @job.load
expect(@job.last_performed_at.utc.to_s).to be_eql @saved_last_performed_at.utc.to_s expect(@job.last_performed_at.utc.to_s).to be_eql @saved_last_performed_at.utc.to_s
end end

View File

@@ -18,4 +18,10 @@ describe Crono::PerformerProxy do
expect_any_instance_of(described_class).to receive(:once_per) expect_any_instance_of(described_class).to receive(:once_per)
Crono.perform(TestJob).once_per 10.minutes Crono.perform(TestJob).once_per 10.minutes
end end
it 'should add job with args to schedule' do
expect(Crono::Job).to receive(:new).with(TestJob, kind_of(Crono::Period), [:some, {some: 'data'}])
allow(Crono.scheduler).to receive(:add_job)
Crono.perform(TestJob, :some, {some: 'data'}).every(2.days, at: '15:30')
end
end end

View File

@@ -5,7 +5,7 @@ describe Crono::Scheduler do
describe '#add_job' do describe '#add_job' do
it 'should call Job#load on Job' do it 'should call Job#load on Job' do
@job = Crono::Job.new(TestJob, Crono::Period.new(10.day, at: '04:05')) @job = Crono::Job.new(TestJob, Crono::Period.new(10.day, at: '04:05'), [])
expect(@job).to receive(:load) expect(@job).to receive(:load)
scheduler.add_job(@job) scheduler.add_job(@job)
end end
@@ -17,7 +17,7 @@ describe Crono::Scheduler do
Crono::Period.new(3.days, at: 10.minutes.from_now.strftime('%H:%M')), Crono::Period.new(3.days, at: 10.minutes.from_now.strftime('%H:%M')),
Crono::Period.new(1.day, at: 20.minutes.from_now.strftime('%H:%M')), Crono::Period.new(1.day, at: 20.minutes.from_now.strftime('%H:%M')),
Crono::Period.new(7.days, at: 40.minutes.from_now.strftime('%H:%M')) Crono::Period.new(7.days, at: 40.minutes.from_now.strftime('%H:%M'))
].map { |period| Crono::Job.new(TestJob, period) } ].map { |period| Crono::Job.new(TestJob, period, []) }
time, jobs = scheduler.next_jobs time, jobs = scheduler.next_jobs
expect(jobs).to be_eql [jobs[0]] expect(jobs).to be_eql [jobs[0]]
@@ -29,7 +29,7 @@ describe Crono::Scheduler do
Crono::Period.new(1.day, at: time.strftime('%H:%M')), Crono::Period.new(1.day, at: time.strftime('%H:%M')),
Crono::Period.new(1.day, at: time.strftime('%H:%M')), Crono::Period.new(1.day, at: time.strftime('%H:%M')),
Crono::Period.new(1.day, at: 10.minutes.from_now.strftime('%H:%M')) Crono::Period.new(1.day, at: 10.minutes.from_now.strftime('%H:%M'))
].map { |period| Crono::Job.new(TestJob, period) } ].map { |period| Crono::Job.new(TestJob, period, []) }
time, jobs = scheduler.next_jobs time, jobs = scheduler.next_jobs
expect(jobs).to be_eql [jobs[0], jobs[1]] expect(jobs).to be_eql [jobs[0], jobs[1]]
@@ -40,7 +40,7 @@ describe Crono::Scheduler do
Crono::Period.new(10.seconds), Crono::Period.new(10.seconds),
Crono::Period.new(10.seconds), Crono::Period.new(10.seconds),
Crono::Period.new(1.day, at: 10.minutes.from_now.strftime('%H:%M')) Crono::Period.new(1.day, at: 10.minutes.from_now.strftime('%H:%M'))
].map { |period| Crono::Job.new(TestJob, period) } ].map { |period| Crono::Job.new(TestJob, period, []) }
_, next_jobs = scheduler.next_jobs _, next_jobs = scheduler.next_jobs
expect(next_jobs).to be_eql [jobs[0]] expect(next_jobs).to be_eql [jobs[0]]