From c9b90d380d2211171389bd2d95d3bf6ad062c28b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sun, 14 Jan 2024 01:40:01 +0100 Subject: [PATCH 1/5] Implement job batches --- README.md | 18 ++++++++ lib/activejob-status.rb | 1 + lib/activejob-status/batch.rb | 23 ++++++++++ spec/specs/active_job/status/batch_spec.rb | 52 ++++++++++++++++++++++ 4 files changed, 94 insertions(+) create mode 100644 lib/activejob-status/batch.rb create mode 100644 spec/specs/active_job/status/batch_spec.rb diff --git a/README.md b/README.md index 0c0e76b..0f78d46 100644 --- a/README.md +++ b/README.md @@ -324,6 +324,24 @@ class MyJob < ActiveJob::Base end ``` +### Grouping jobs into batches + +```ruby +job = MyJob.perform_later +other_job = OtherJob.perform_later + +batch = ActiveJob::Status::Batch.new(job, other_job) +batch.status +# "queued" +``` + +The batch status can be `queued`, `failed`, `completed` or `working`. + +1. The batch is considered `queued` if **all** of the jobs are `queued` +2. The batch is considered `failed` if **one** of the jobs is `failed` +3. The batch is considered `completed` if **all** of the jobs are `completed` +4. The batch is considered `working` in all other circumstances + ## ActiveJob::Status and exceptions Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status diff --git a/lib/activejob-status.rb b/lib/activejob-status.rb index 16ca5c5..c263ff7 100644 --- a/lib/activejob-status.rb +++ b/lib/activejob-status.rb @@ -8,6 +8,7 @@ require "activejob-status/status" require "activejob-status/progress" require "activejob-status/throttle" +require "activejob-status/batch" module ActiveJob module Status diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb new file mode 100644 index 0000000..3c37001 --- /dev/null +++ b/lib/activejob-status/batch.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module ActiveJob + module Status + class Batch + def initialize(*jobs) + @statuses = jobs.map { |job| ActiveJob::Status.get(job) } + end + + def status + if @statuses.all? { |status| status[:status] == :queued } + :queued + elsif @statuses.any? { |status| status[:status] == :failed } + :failed + elsif @statuses.all? { |status| status[:status] == :completed } + :completed + else + :working + end + end + end + end +end diff --git a/spec/specs/active_job/status/batch_spec.rb b/spec/specs/active_job/status/batch_spec.rb new file mode 100644 index 0000000..884cffa --- /dev/null +++ b/spec/specs/active_job/status/batch_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require_relative "../../../spec_helper" +require_relative "../../../jobs/test_jobs" + +RSpec.describe ActiveJob::Status::Batch do + describe "#status" do + it "returns queued when all jobs are queued" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :queued) + + expect(batch.status).to eq(:queued) + end + + it "returns failed when one job is failed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :failed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:failed) + end + + it "returns completed when all jobs are completed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :completed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:completed) + end + + it "returns working in other cases" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :working) + + expect(batch.status).to eq(:working) + end + end +end From 4ce106bacf34ad1fabccd5c877139cf05953b314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Thu, 25 Jan 2024 15:59:06 +0100 Subject: [PATCH 2/5] Document callbacks --- README.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/README.md b/README.md index 0f78d46..0db1f54 100644 --- a/README.md +++ b/README.md @@ -342,6 +342,34 @@ The batch status can be `queued`, `failed`, `completed` or `working`. 3. The batch is considered `completed` if **all** of the jobs are `completed` 4. The batch is considered `working` in all other circumstances +### Callbacks + +You can implement callbacks, by listening to the completion of a batch with a +simple ActiveJob job. + +```ruby +# frozen_string_literal: true + +require 'activejob-status' + +class CallbacksJob < ApplicationJob + queue_as :real_time + + def perform(*job_ids) + batch = ActiveJob::Status::Batch.new(*job_ids) + + case batch.status + when :queued, :working + MonitorAnalysisBatchJob.set(wait: 5.seconds).perform_later(*job_ids) + when :completed + # Completed callback + when :failed + # Failed callback + end + end +end +``` + ## ActiveJob::Status and exceptions Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status From c6ca11ac17dbc9d6fc56ed1bec2aad2535e961f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 13:41:20 +0100 Subject: [PATCH 3/5] Pass jobs as an array --- README.md | 4 ++-- lib/activejob-status/batch.rb | 2 +- spec/specs/active_job/status/batch_spec.rb | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0db1f54..c8825d4 100644 --- a/README.md +++ b/README.md @@ -330,7 +330,7 @@ end job = MyJob.perform_later other_job = OtherJob.perform_later -batch = ActiveJob::Status::Batch.new(job, other_job) +batch = ActiveJob::Status::Batch.new([job, other_job]) batch.status # "queued" ``` @@ -356,7 +356,7 @@ class CallbacksJob < ApplicationJob queue_as :real_time def perform(*job_ids) - batch = ActiveJob::Status::Batch.new(*job_ids) + batch = ActiveJob::Status::Batch.new(job_ids) case batch.status when :queued, :working diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index 3c37001..3027a1a 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -3,7 +3,7 @@ module ActiveJob module Status class Batch - def initialize(*jobs) + def initialize(jobs) @statuses = jobs.map { |job| ActiveJob::Status.get(job) } end diff --git a/spec/specs/active_job/status/batch_spec.rb b/spec/specs/active_job/status/batch_spec.rb index 884cffa..8704ec5 100644 --- a/spec/specs/active_job/status/batch_spec.rb +++ b/spec/specs/active_job/status/batch_spec.rb @@ -8,7 +8,7 @@ it "returns queued when all jobs are queued" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :queued) ActiveJob::Status.get(second_job).update(status: :queued) @@ -19,7 +19,7 @@ it "returns failed when one job is failed" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :failed) ActiveJob::Status.get(second_job).update(status: :completed) @@ -30,7 +30,7 @@ it "returns completed when all jobs are completed" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :completed) ActiveJob::Status.get(second_job).update(status: :completed) @@ -41,7 +41,7 @@ it "returns working in other cases" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :queued) ActiveJob::Status.get(second_job).update(status: :working) From 3843f029a9891b00f441f7bac094672d5a4d9e8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 14:45:30 +0100 Subject: [PATCH 4/5] Use read_multi for reading statuses --- lib/activejob-status/batch.rb | 19 +++++++++++++++---- lib/activejob-status/storage.rb | 4 ++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index 3027a1a..fb05efa 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -4,20 +4,31 @@ module ActiveJob module Status class Batch def initialize(jobs) - @statuses = jobs.map { |job| ActiveJob::Status.get(job) } + @jobs = jobs + @storage = ActiveJob::Status::Storage.new end def status - if @statuses.all? { |status| status[:status] == :queued } + if @jobs.all? { |job| status_for(job) == :queued } :queued - elsif @statuses.any? { |status| status[:status] == :failed } + elsif @jobs.any? { |job| status_for(job) == :failed } :failed - elsif @statuses.all? { |status| status[:status] == :completed } + elsif @jobs.all? { |job| status_for(job) == :completed } :completed else :working end end + + private + + def statuses + @statuses ||= @storage.read_multi(@jobs) + end + + def status_for(job) + statuses.dig(@storage.key(job), :status) + end end end end diff --git a/lib/activejob-status/storage.rb b/lib/activejob-status/storage.rb index da0f7d6..455fb73 100644 --- a/lib/activejob-status/storage.rb +++ b/lib/activejob-status/storage.rb @@ -26,6 +26,10 @@ def read(job) store.read(key(job)) || {} end + def read_multi(jobs) + store.read_multi(*jobs.map { |job| key(job) }) + end + def write(job, message, force: false) @throttle.wrap(force: force) do store.write(key(job), message, expires_in: @expires_in) From 754978809250ac78d1645be4f094cec43f8545a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 17:44:25 +0100 Subject: [PATCH 5/5] Standardize read_multi output --- lib/activejob-status/batch.rb | 21 +++++++++------------ lib/activejob-status/storage.rb | 4 +++- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index fb05efa..d0d4801 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -9,26 +9,23 @@ def initialize(jobs) end def status - if @jobs.all? { |job| status_for(job) == :queued } - :queued - elsif @jobs.any? { |job| status_for(job) == :failed } + statuses = read.values.pluck(:status) + + if statuses.include?(:failed) :failed - elsif @jobs.all? { |job| status_for(job) == :completed } + elsif statuses.all?(:queued) + :queued + elsif statuses.all?(:completed) :completed else :working end end - private - - def statuses - @statuses ||= @storage.read_multi(@jobs) - end - - def status_for(job) - statuses.dig(@storage.key(job), :status) + def read + @storage.read_multi(@jobs) end + alias_method :to_h, :read end end end diff --git a/lib/activejob-status/storage.rb b/lib/activejob-status/storage.rb index 455fb73..d0c9235 100644 --- a/lib/activejob-status/storage.rb +++ b/lib/activejob-status/storage.rb @@ -27,7 +27,9 @@ def read(job) end def read_multi(jobs) - store.read_multi(*jobs.map { |job| key(job) }) + keys = jobs.map { |job| key(job) } + data = store.read_multi(*keys) + keys.index_with { |k| data.fetch(k, {}) } end def write(job, message, force: false)