From 32bdba324481b3a50c0e588de9c66fee93d725c4 Mon Sep 17 00:00:00 2001 From: avi_alima Date: Wed, 19 Aug 2015 18:08:48 +0300 Subject: [PATCH 1/3] Add ability to define minimal time between job executions to support multiple corno nodes, so two different nodes will not execute the same job --- Changes.md | 5 +++++ lib/crono/job.rb | 9 ++++++++- lib/crono/performer_proxy.rb | 10 ++++++++-- lib/crono/version.rb | 2 +- spec/job_spec.rb | 26 ++++++++++++++++++++++++++ spec/performer_proxy_spec.rb | 13 +++++++++++++ 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/Changes.md b/Changes.md index 84ed526..c3350c0 100644 --- a/Changes.md +++ b/Changes.md @@ -1,3 +1,8 @@ +0.9.1 +----------- +- Add ability to define minimal time between job executions to support multiple corno nodes, so two different nodes will not execute the same job + + 0.8.9 ----------- diff --git a/lib/crono/job.rb b/lib/crono/job.rb index a292c54..b1557b0 100644 --- a/lib/crono/job.rb +++ b/lib/crono/job.rb @@ -7,9 +7,10 @@ module Crono include Logging attr_accessor :performer, :period, :last_performed_at, - :next_performed_at, :job_log, :job_logger, :healthy + :next_performed_at, :job_log, :job_logger, :healthy, :execution_interval def initialize(performer, period) + self.execution_interval = 0.minutes self.performer, self.period = performer, period self.job_log = StringIO.new self.job_logger = Logger.new(job_log) @@ -31,6 +32,8 @@ module Crono end def perform + return Thread.new {} if perform_before_interval? + log "Perform #{performer}" self.last_performed_at = Time.now self.next_performed_at = period.next(since: last_performed_at) @@ -102,5 +105,9 @@ module Crono def model @model ||= Crono::CronoJob.find_or_create_by(job_id: job_id) end + + def perform_before_interval? + self.last_performed_at.present? && self.last_performed_at > execution_interval.ago + end end end diff --git a/lib/crono/performer_proxy.rb b/lib/crono/performer_proxy.rb index ba370da..e632e95 100644 --- a/lib/crono/performer_proxy.rb +++ b/lib/crono/performer_proxy.rb @@ -7,8 +7,14 @@ module Crono end def every(period, *args) - job = Job.new(@performer, Period.new(period, *args)) - @scheduler.add_job(job) + @job = Job.new(@performer, Period.new(period, *args)) + @scheduler.add_job(@job) + self + end + + def once_per(execution_interval) + @job.execution_interval = execution_interval if @job + self end end diff --git a/lib/crono/version.rb b/lib/crono/version.rb index ff013c9..ddd6785 100644 --- a/lib/crono/version.rb +++ b/lib/crono/version.rb @@ -1,3 +1,3 @@ module Crono - VERSION = '0.9.0' + VERSION = '0.9.1' end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 029e50c..1488a3b 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -36,6 +36,32 @@ describe Crono::Job do failing_job.perform.join expect(failing_job.healthy).to be false end + + it 'should execute one' do + job.execution_interval = 5.minutes + + expect(job).to receive(:perform_job).once + job.perform.join + thread = job.perform.join + expect(thread).to be_stop + end + + it 'should execute twice' do + job.execution_interval = 0.minutes + + test_preform_job_twice + end + + it 'should execute twice without initialize execution_interval' do + test_preform_job_twice + end + + def test_preform_job_twice + expect(job).to receive(:perform_job).twice + job.perform.join + thread = job.perform.join + expect(thread).to be_stop + end end describe '#description' do diff --git a/spec/performer_proxy_spec.rb b/spec/performer_proxy_spec.rb index 6e884fb..02f39e7 100644 --- a/spec/performer_proxy_spec.rb +++ b/spec/performer_proxy_spec.rb @@ -5,4 +5,17 @@ describe Crono::PerformerProxy do expect(Crono.scheduler).to receive(:add_job).with(kind_of(Crono::Job)) Crono.perform(TestJob).every(2.days, at: '15:30') end + + it 'should set execution interval' do + allow(Crono).to receive(:scheduler).and_return(Crono::Scheduler.new) + expect_any_instance_of(Crono::Job).to receive(:execution_interval=).with(0.minutes).once + expect_any_instance_of(Crono::Job).to receive(:execution_interval=).with(10.minutes).once + Crono.perform(TestJob).every(2.days, at: '15:30').once_per 10.minutes + end + + it 'do nothing when job not initalized' do + expect_any_instance_of(Crono::Job).not_to receive(:execution_interval=) + expect_any_instance_of(described_class).to receive(:once_per) + Crono.perform(TestJob).once_per 10.minutes + end end From 3a480a7d9a6190815049bfe68f186ca0ef5704d2 Mon Sep 17 00:00:00 2001 From: avi_alima Date: Thu, 20 Aug 2015 12:54:16 +0300 Subject: [PATCH 2/3] Add ability to define minimal time between job executions to support multiple corno nodes, so two different nodes will not execute the same job Add Locking for the case that two nodes start perform job together. --- lib/crono/job.rb | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/crono/job.rb b/lib/crono/job.rb index b1557b0..feabe9b 100644 --- a/lib/crono/job.rb +++ b/lib/crono/job.rb @@ -107,7 +107,23 @@ module Crono end def perform_before_interval? - self.last_performed_at.present? && self.last_performed_at > execution_interval.ago + return true if self.last_performed_at.present? && self.last_performed_at > execution_interval.ago + return true if model.updated_at.present? && model.created_at != model.updated_at && model.updated_at > execution_interval.ago + + Crono::CronoJob.transaction do + job_record = Crono::CronoJob.where(job_id: job_id).lock(true).first + + return true if job_record.updated_at.present? && + job_record.updated_at != job_record.created_at && + job_record.updated_at > execution_interval.ago + + job_record.touch + + return true unless job_record.save + end + + # Means that this node is permit to perform the job. + return false end end end From e416113ac242e3c0f26e46ee06d06ce78d4a8117 Mon Sep 17 00:00:00 2001 From: avi_alima Date: Thu, 20 Aug 2015 13:35:41 +0300 Subject: [PATCH 3/3] Add ability to define minimal time between job executions to support multiple corno nodes, so two different nodes will not execute the same job Add Locking for the case that two nodes start perform job together. If execution_interval == 0.minutes, skip locking and immediately perform --- lib/crono/job.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/crono/job.rb b/lib/crono/job.rb index feabe9b..8a86b95 100644 --- a/lib/crono/job.rb +++ b/lib/crono/job.rb @@ -107,6 +107,8 @@ module Crono end def perform_before_interval? + return false if execution_interval == 0.minutes + return true if self.last_performed_at.present? && self.last_performed_at > execution_interval.ago return true if model.updated_at.present? && model.created_at != model.updated_at && model.updated_at > execution_interval.ago