Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions app/api/helpers/community_helpers.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'envelope_graph_sync'

# EnvelopeCommunity specific helpers
module CommunityHelpers
extend Grape::API::Helpers
Expand All @@ -21,6 +23,12 @@ def community_error(msg)
json_error! [msg], nil, :unprocessable_entity
end

def assert_publish_unlocked!
return unless EnvelopeGraphSync.syncing?(current_community)

json_error!([EnvelopeGraphSync::PUBLISH_LOCKED], nil, 503)
end

def normalized_community_names
[
params[:community_name].try(:underscore),
Expand Down
1 change: 1 addition & 0 deletions app/api/v1/publish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Publish
end
post do
authorize Envelope, :create?
assert_publish_unlocked!

secondary_token_header = request.headers['Secondary-Token']
secondary_token = if secondary_token_header.present?
Expand Down
2 changes: 2 additions & 0 deletions app/api/v2/publish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Publish < MountableAPI # rubocop:todo Style/Documentation
use :update_if_exists
end
post do
assert_publish_unlocked!

secondary_token_header = request.headers['Secondary-Token']
secondary_token = (secondary_token_header.split.last if secondary_token_header.present?)

Expand Down
46 changes: 46 additions & 0 deletions app/jobs/sync_envelope_graphs_with_s3_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
require 'envelope_graph_sync'
require 'sync_pending_envelope_graphs_with_s3'

class SyncEnvelopeGraphsWithS3Job < ActiveJob::Base # rubocop:todo Style/Documentation
Comment thread
rohit-joy marked this conversation as resolved.
def perform(sync_id)
sync = EnvelopeGraphSync.find(sync_id)
last_activity_at = nil
last_activity_version_id = nil
wait_until = nil

sync.with_lock do
return if sync.scheduled_for_at.blank?

last_activity_at = sync.last_activity_at
last_activity_version_id = sync.last_activity_version_id
wait_until = last_activity_at + EnvelopeGraphSync.debounce_window

if wait_until > Time.current
sync.update!(scheduled_for_at: wait_until)
else
wait_until = nil
sync.update!(scheduled_for_at: nil)
end
end

if wait_until
self.class.set(wait_until: wait_until).perform_later(sync.id)
return
end

return unless last_activity_at && last_activity_version_id && sync.mark_syncing!

begin
SyncPendingEnvelopeGraphsWithS3.new(
envelope_community: sync.envelope_community,
cutoff_version_id: last_activity_version_id,
sync: sync
).call
rescue StandardError => e
sync.mark_sync_error!(e)
raise
ensure
sync.clear_syncing!
end
end
end
29 changes: 23 additions & 6 deletions app/models/envelope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'authorized_key'
require 'export_to_ocn_job'
require 'delete_from_ocn_job'
require 'envelope_graph_sync'
require 'envelope_version'
require_relative 'extensions/transactionable_envelope'
require_relative 'extensions/learning_registry_resources'
Expand Down Expand Up @@ -46,10 +47,10 @@ class Envelope < ActiveRecord::Base
before_validation :process_resource, :process_headers
before_save :assign_last_verified_on
after_save :update_headers
after_save :upload_to_s3
after_commit :schedule_s3_upload, on: %i[create update]
before_destroy :delete_description_sets, prepend: true
after_destroy :delete_from_ocn
after_destroy :delete_from_s3
after_commit :schedule_s3_delete, on: :destroy
after_commit :export_to_ocn

validates :envelope_community, :envelope_type, :envelope_version,
Expand Down Expand Up @@ -271,11 +272,27 @@ def export_to_ocn
ExportToOCNJob.perform_later(id)
end

def upload_to_s3
SyncEnvelopeGraphWithS3.upload(self)
def schedule_s3_upload
return if envelope_ceterms_ctid.blank?

EnvelopeGraphSync.record_activity!(
envelope_community,
version_id: latest_s3_sync_version_id
)
end

def delete_from_s3
SyncEnvelopeGraphWithS3.remove(self)
def schedule_s3_delete
return if envelope_ceterms_ctid.blank? || envelope_community.blank?

EnvelopeGraphSync.record_activity!(
envelope_community,
version_id: latest_s3_sync_version_id
)
end

def latest_s3_sync_version_id
EnvelopeVersion
.where(item_type: self.class.name, item_id: id)
.maximum(:id)
end
end
141 changes: 141 additions & 0 deletions app/models/envelope_graph_sync.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Tracks debounced S3 graph sync scheduling for an envelope community.
require 'sync_envelope_graphs_with_s3_job'

class EnvelopeGraphSync < ActiveRecord::Base
PUBLISH_LOCKED = 'Publishing is temporarily locked while S3 sync is in progress'.freeze

belongs_to :envelope_community

validates :envelope_community_id, uniqueness: true
validates :last_activity_at, presence: true

class << self
def debounce_window
(ENV['ENVELOPE_GRAPH_SYNC_DEBOUNCE_SECONDS'].presence || '60').to_i.seconds
end

def sync_lock_timeout
(ENV['ENVELOPE_GRAPH_SYNC_LOCK_TIMEOUT_SECONDS'].presence || '600').to_i.seconds
end

def syncing?(envelope_community)
sync = find_by(envelope_community: envelope_community)
return false unless sync

sync.syncing?
end

def record_activity!(envelope_community, version_id:)
return unless version_id

sync = find_or_create_by!(envelope_community: envelope_community) do |record|
record.last_activity_at = Time.current
record.last_synced_version_id = previous_version_id(envelope_community, version_id)
end

wait_until = nil

sync.with_lock do
now = Time.current
sync.last_activity_at = now
sync.last_activity_version_id = [
sync.last_activity_version_id,
version_id
].compact.max

if sync.scheduled_for_at.blank? || sync.scheduled_for_at <= now
wait_until = now + debounce_window
sync.scheduled_for_at = wait_until
end

sync.save!
end

if wait_until
SyncEnvelopeGraphsWithS3Job.set(wait_until: wait_until).perform_later(sync.id)
end

sync
end

private

def previous_version_id(envelope_community, version_id)
EnvelopeVersion
.where(item_type: 'Envelope', envelope_community_id: envelope_community.id)
.where('id < ?', version_id)
.maximum(:id)
end
end

def syncing?
return false unless syncing

clear_stale_sync! if stale_sync?

syncing
end

def mark_syncing!
with_lock do
clear_stale_sync! if stale_sync?
return false if syncing

update!(
syncing: true,
syncing_started_at: Time.current,
last_sync_error: nil
)
end

true
end

def clear_syncing!
update!(
syncing: false,
syncing_started_at: nil,
last_sync_finished_at: Time.current
)
end

def mark_sync_error!(error)
update!(last_sync_error: "#{error.class}: #{error.message}")
end

def mark_synced_through!(version_id)
return unless version_id

with_lock do
update!(
last_synced_version_id: [
last_synced_version_id,
version_id
].compact.max
)
end
end

def stale_sync?
syncing && syncing_started_at.present? && syncing_started_at <= self.class.sync_lock_timeout.ago
end

def clear_stale_sync!
return unless stale_sync?

MR.log_with_labels(
:warn,
'Clearing stale envelope graph sync lock',
envelope_community: envelope_community.name,
syncing_started_at: syncing_started_at.iso8601,
timeout_seconds: self.class.sync_lock_timeout.to_i
)

update!(
syncing: false,
syncing_started_at: nil,
last_sync_finished_at: Time.current,
last_sync_error: 'Stale sync lock cleared after timeout'
)
end
end
4 changes: 3 additions & 1 deletion app/services/submit_envelope_download_workflow.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'argo_workflows_client'

class SubmitEnvelopeDownloadWorkflow
TEMPLATE_NAME = 's3-graphs-zip'.freeze

def self.call(envelope_download:)
new(envelope_download).call
end
Expand All @@ -16,7 +18,7 @@ def call
return envelope_download if workflow_already_started?

workflow = client.submit_workflow(
template_name: ENV.fetch('ARGO_WORKFLOWS_TEMPLATE_NAME'),
template_name: TEMPLATE_NAME,
generate_name: "#{community_name.tr('_', '-')}-download-",
parameters:
)
Expand Down
61 changes: 61 additions & 0 deletions app/services/submit_partial_graph_index_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'argo_workflows_client'

class SubmitPartialGraphIndexWorkflow
TEMPLATE_NAME = 'update-index-graphs-input-file-elasticsearch'.freeze

def self.call(envelope_community:, manifest_key:)
new(envelope_community: envelope_community, manifest_key: manifest_key).call
end

attr_reader :envelope_community, :manifest_key

def initialize(envelope_community:, manifest_key:)
@envelope_community = envelope_community
@manifest_key = manifest_key
end

def call
workflow = client.submit_workflow(
template_name: TEMPLATE_NAME,
generate_name: "#{community_name.tr('_', '-')}-partial-graph-index-",
parameters: parameters
)
workflow_name = workflow.dig(:metadata, :name)
raise 'Argo workflow submission did not return a workflow name' if workflow_name.blank?

workflow
end

private

def client
@client ||= ArgoWorkflowsClient.new
end

def community_name
envelope_community.name
end

def task_image
ENV.fetch(
'ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_TASK_IMAGE',
ENV.fetch('ARGO_WORKFLOWS_TASK_IMAGE')
)
end

def parameters
{
'task-image' => task_image,
'index-name' => ENV.fetch('ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_NAME'),
'input-bucket' => graphs_bucket,
'input-file-key' => manifest_key,
'source-bucket' => graphs_bucket,
'prefix-path' => '',
'aws-region' => ENV.fetch('AWS_REGION')
}
end

def graphs_bucket
ENV.fetch('ENVELOPE_GRAPHS_BUCKET')
end
end
Loading
Loading