Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions app/api/helpers/community_helpers.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'envelope_graph_sync'

# EnvelopeCommunity specific helpers
module CommunityHelpers
extend Grape::API::Helpers
Expand All @@ -21,6 +23,12 @@ def community_error(msg)
json_error! [msg], nil, :unprocessable_entity
end

def assert_publish_unlocked!
return unless EnvelopeGraphSync.syncing?(current_community)

json_error!([EnvelopeGraphSync::PUBLISH_LOCKED], nil, 503)
end

def normalized_community_names
[
params[:community_name].try(:underscore),
Expand Down
1 change: 1 addition & 0 deletions app/api/v1/publish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Publish
end
post do
authorize Envelope, :create?
assert_publish_unlocked!

secondary_token_header = request.headers['Secondary-Token']
secondary_token = if secondary_token_header.present?
Expand Down
2 changes: 2 additions & 0 deletions app/api/v2/publish.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Publish < MountableAPI # rubocop:todo Style/Documentation
use :update_if_exists
end
post do
assert_publish_unlocked!

secondary_token_header = request.headers['Secondary-Token']
secondary_token = (secondary_token_header.split.last if secondary_token_header.present?)

Expand Down
53 changes: 53 additions & 0 deletions app/jobs/sync_envelope_graphs_with_s3_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require 'envelope_graph_sync'
require 'sync_pending_envelope_graphs_with_s3'

class SyncEnvelopeGraphsWithS3Job < ActiveJob::Base # rubocop:todo Style/Documentation
Comment thread
rohit-joy marked this conversation as resolved.
def perform(sync_id)
sync = EnvelopeGraphSync.find(sync_id)
last_activity_at = nil
last_activity_version_id = nil
pending_sync = false
wait_until = nil

sync.with_lock do
last_activity_at = sync.last_activity_at
last_activity_version_id = sync.last_activity_version_id
pending_sync = last_activity_version_id.present? &&
(sync.last_synced_version_id.blank? ||
sync.last_synced_version_id < last_activity_version_id)

return if sync.scheduled_for_at.blank? && !pending_sync

if sync.scheduled_for_at.present?
wait_until = last_activity_at + EnvelopeGraphSync.debounce_window

if wait_until > Time.current
sync.update!(scheduled_for_at: wait_until)
else
wait_until = nil
sync.update!(scheduled_for_at: nil)
end
end
end

if wait_until
self.class.set(wait_until: wait_until).perform_later(sync.id)
return
end

return unless last_activity_at && last_activity_version_id && sync.mark_syncing!

begin
SyncPendingEnvelopeGraphsWithS3.new(
envelope_community: sync.envelope_community,
cutoff_version_id: last_activity_version_id,
sync: sync
).call
rescue StandardError => e
sync.mark_sync_error!(e)
raise
ensure
sync.clear_syncing!
end
end
end
29 changes: 23 additions & 6 deletions app/models/envelope.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'authorized_key'
require 'export_to_ocn_job'
require 'delete_from_ocn_job'
require 'envelope_graph_sync'
require 'envelope_version'
require_relative 'extensions/transactionable_envelope'
require_relative 'extensions/learning_registry_resources'
Expand Down Expand Up @@ -46,10 +47,10 @@ class Envelope < ActiveRecord::Base
before_validation :process_resource, :process_headers
before_save :assign_last_verified_on
after_save :update_headers
after_save :upload_to_s3
after_commit :schedule_s3_upload, on: %i[create update]
before_destroy :delete_description_sets, prepend: true
after_destroy :delete_from_ocn
after_destroy :delete_from_s3
after_commit :schedule_s3_delete, on: :destroy
after_commit :export_to_ocn

validates :envelope_community, :envelope_type, :envelope_version,
Expand Down Expand Up @@ -271,11 +272,27 @@ def export_to_ocn
ExportToOCNJob.perform_later(id)
end

def upload_to_s3
SyncEnvelopeGraphWithS3.upload(self)
def schedule_s3_upload
return if envelope_ceterms_ctid.blank?

EnvelopeGraphSync.record_activity!(
envelope_community,
version_id: latest_s3_sync_version_id
)
end

def delete_from_s3
SyncEnvelopeGraphWithS3.remove(self)
def schedule_s3_delete
return if envelope_ceterms_ctid.blank? || envelope_community.blank?

EnvelopeGraphSync.record_activity!(
envelope_community,
version_id: latest_s3_sync_version_id
)
end

def latest_s3_sync_version_id
EnvelopeVersion
.where(item_type: self.class.name, item_id: id)
.maximum(:id)
end
end
141 changes: 141 additions & 0 deletions app/models/envelope_graph_sync.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Tracks debounced S3 graph sync scheduling for an envelope community.
require 'sync_envelope_graphs_with_s3_job'

class EnvelopeGraphSync < ActiveRecord::Base
PUBLISH_LOCKED = 'Publishing is temporarily locked while S3 sync is in progress'.freeze

belongs_to :envelope_community

validates :envelope_community_id, uniqueness: true
validates :last_activity_at, presence: true

class << self
def debounce_window
(ENV['ENVELOPE_GRAPH_SYNC_DEBOUNCE_SECONDS'].presence || '60').to_i.seconds
end

def sync_lock_timeout
(ENV['ENVELOPE_GRAPH_SYNC_LOCK_TIMEOUT_SECONDS'].presence || '600').to_i.seconds
end

def syncing?(envelope_community)
sync = find_by(envelope_community: envelope_community)
return false unless sync

sync.syncing?
end

def record_activity!(envelope_community, version_id:)
return unless version_id

sync = find_or_create_by!(envelope_community: envelope_community) do |record|
record.last_activity_at = Time.current
record.last_synced_version_id = previous_version_id(envelope_community, version_id)
end

wait_until = nil

sync.with_lock do
now = Time.current
sync.last_activity_at = now
sync.last_activity_version_id = [
sync.last_activity_version_id,
version_id
].compact.max

if sync.scheduled_for_at.blank? || sync.scheduled_for_at <= now
wait_until = now + debounce_window
sync.scheduled_for_at = wait_until
end

sync.save!
end

if wait_until
SyncEnvelopeGraphsWithS3Job.set(wait_until: wait_until).perform_later(sync.id)
end

sync
end

private

def previous_version_id(envelope_community, version_id)
EnvelopeVersion
.where(item_type: 'Envelope', envelope_community_id: envelope_community.id)
.where('id < ?', version_id)
.maximum(:id)
end
end

def syncing?
return false unless syncing

clear_stale_sync! if stale_sync?

syncing
end

def mark_syncing!
with_lock do
clear_stale_sync! if stale_sync?
return false if syncing

update!(
syncing: true,
syncing_started_at: Time.current,
last_sync_error: nil
)
end

true
end

def clear_syncing!
update!(
syncing: false,
syncing_started_at: nil,
last_sync_finished_at: Time.current
)
end

def mark_sync_error!(error)
update!(last_sync_error: "#{error.class}: #{error.message}")
end

def mark_synced_through!(version_id)
return unless version_id

with_lock do
update!(
last_synced_version_id: [
last_synced_version_id,
version_id
].compact.max
)
end
end

def stale_sync?
syncing && syncing_started_at.present? && syncing_started_at <= self.class.sync_lock_timeout.ago
end

def clear_stale_sync!
return unless stale_sync?

MR.log_with_labels(
:warn,
'Clearing stale envelope graph sync lock',
envelope_community: envelope_community.name,
syncing_started_at: syncing_started_at.iso8601,
timeout_seconds: self.class.sync_lock_timeout.to_i
)

update!(
syncing: false,
syncing_started_at: nil,
last_sync_finished_at: Time.current,
last_sync_error: 'Stale sync lock cleared after timeout'
)
end
end
4 changes: 3 additions & 1 deletion app/services/submit_envelope_download_workflow.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'argo_workflows_client'

class SubmitEnvelopeDownloadWorkflow
TEMPLATE_NAME = 's3-graphs-zip'.freeze

def self.call(envelope_download:)
new(envelope_download).call
end
Expand All @@ -16,7 +18,7 @@ def call
return envelope_download if workflow_already_started?

workflow = client.submit_workflow(
template_name: ENV.fetch('ARGO_WORKFLOWS_TEMPLATE_NAME'),
template_name: TEMPLATE_NAME,
generate_name: "#{community_name.tr('_', '-')}-download-",
parameters:
)
Expand Down
61 changes: 61 additions & 0 deletions app/services/submit_partial_graph_index_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'argo_workflows_client'

class SubmitPartialGraphIndexWorkflow
TEMPLATE_NAME = 'update-index-graphs-input-file-elasticsearch'.freeze

def self.call(envelope_community:, manifest_key:)
new(envelope_community: envelope_community, manifest_key: manifest_key).call
end

attr_reader :envelope_community, :manifest_key

def initialize(envelope_community:, manifest_key:)
@envelope_community = envelope_community
@manifest_key = manifest_key
end

def call
workflow = client.submit_workflow(
template_name: TEMPLATE_NAME,
generate_name: "#{community_name.tr('_', '-')}-partial-graph-index-",
parameters: parameters
)
workflow_name = workflow.dig(:metadata, :name)
raise 'Argo workflow submission did not return a workflow name' if workflow_name.blank?

workflow
end

private

def client
@client ||= ArgoWorkflowsClient.new
end

def community_name
envelope_community.name
end

def task_image
ENV.fetch(
'ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_TASK_IMAGE',
ENV.fetch('ARGO_WORKFLOWS_TASK_IMAGE')
)
end

def parameters
{
'task-image' => task_image,
'index-name' => ENV.fetch('ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_NAME'),
'input-bucket' => graphs_bucket,
'input-file-key' => manifest_key,
'source-bucket' => graphs_bucket,
'prefix-path' => '',
'aws-region' => ENV.fetch('AWS_REGION')
}
end

def graphs_bucket
ENV.fetch('ENVELOPE_GRAPHS_BUCKET')
end
end
Loading
Loading