diff --git a/app/api/helpers/community_helpers.rb b/app/api/helpers/community_helpers.rb index 30560468..6dee4a82 100644 --- a/app/api/helpers/community_helpers.rb +++ b/app/api/helpers/community_helpers.rb @@ -1,3 +1,5 @@ +require 'envelope_graph_sync' + # EnvelopeCommunity specific helpers module CommunityHelpers extend Grape::API::Helpers @@ -21,6 +23,12 @@ def community_error(msg) json_error! [msg], nil, :unprocessable_entity end + def assert_publish_unlocked! + return unless EnvelopeGraphSync.syncing?(current_community) + + json_error!([EnvelopeGraphSync::PUBLISH_LOCKED], nil, 503) + end + def normalized_community_names [ params[:community_name].try(:underscore), diff --git a/app/api/v1/publish.rb b/app/api/v1/publish.rb index 0e4ae79b..a8bf26f7 100644 --- a/app/api/v1/publish.rb +++ b/app/api/v1/publish.rb @@ -37,6 +37,7 @@ module Publish end post do authorize Envelope, :create? + assert_publish_unlocked! secondary_token_header = request.headers['Secondary-Token'] secondary_token = if secondary_token_header.present? diff --git a/app/api/v2/publish.rb b/app/api/v2/publish.rb index ff8fc30b..eb1caccc 100644 --- a/app/api/v2/publish.rb +++ b/app/api/v2/publish.rb @@ -41,6 +41,8 @@ class Publish < MountableAPI # rubocop:todo Style/Documentation use :update_if_exists end post do + assert_publish_unlocked! + secondary_token_header = request.headers['Secondary-Token'] secondary_token = (secondary_token_header.split.last if secondary_token_header.present?) diff --git a/app/jobs/sync_envelope_graphs_with_s3_job.rb b/app/jobs/sync_envelope_graphs_with_s3_job.rb new file mode 100644 index 00000000..b6519b58 --- /dev/null +++ b/app/jobs/sync_envelope_graphs_with_s3_job.rb @@ -0,0 +1,53 @@ +require 'envelope_graph_sync' +require 'sync_pending_envelope_graphs_with_s3' + +class SyncEnvelopeGraphsWithS3Job < ActiveJob::Base # rubocop:todo Style/Documentation + def perform(sync_id) + sync = EnvelopeGraphSync.find(sync_id) + last_activity_at = nil + last_activity_version_id = nil + pending_sync = false + wait_until = nil + + sync.with_lock do + last_activity_at = sync.last_activity_at + last_activity_version_id = sync.last_activity_version_id + pending_sync = last_activity_version_id.present? && + (sync.last_synced_version_id.blank? || + sync.last_synced_version_id < last_activity_version_id) + + return if sync.scheduled_for_at.blank? && !pending_sync + + if sync.scheduled_for_at.present? + wait_until = last_activity_at + EnvelopeGraphSync.debounce_window + + if wait_until > Time.current + sync.update!(scheduled_for_at: wait_until) + else + wait_until = nil + sync.update!(scheduled_for_at: nil) + end + end + end + + if wait_until + self.class.set(wait_until: wait_until).perform_later(sync.id) + return + end + + return unless last_activity_at && last_activity_version_id && sync.mark_syncing! + + begin + SyncPendingEnvelopeGraphsWithS3.new( + envelope_community: sync.envelope_community, + cutoff_version_id: last_activity_version_id, + sync: sync + ).call + rescue StandardError => e + sync.mark_sync_error!(e) + raise + ensure + sync.clear_syncing! + end + end +end diff --git a/app/models/envelope.rb b/app/models/envelope.rb index 0919185f..92afaf0b 100644 --- a/app/models/envelope.rb +++ b/app/models/envelope.rb @@ -3,6 +3,7 @@ require 'authorized_key' require 'export_to_ocn_job' require 'delete_from_ocn_job' +require 'envelope_graph_sync' require 'envelope_version' require_relative 'extensions/transactionable_envelope' require_relative 'extensions/learning_registry_resources' @@ -46,10 +47,10 @@ class Envelope < ActiveRecord::Base before_validation :process_resource, :process_headers before_save :assign_last_verified_on after_save :update_headers - after_save :upload_to_s3 + after_commit :schedule_s3_upload, on: %i[create update] before_destroy :delete_description_sets, prepend: true after_destroy :delete_from_ocn - after_destroy :delete_from_s3 + after_commit :schedule_s3_delete, on: :destroy after_commit :export_to_ocn validates :envelope_community, :envelope_type, :envelope_version, @@ -271,11 +272,27 @@ def export_to_ocn ExportToOCNJob.perform_later(id) end - def upload_to_s3 - SyncEnvelopeGraphWithS3.upload(self) + def schedule_s3_upload + return if envelope_ceterms_ctid.blank? + + EnvelopeGraphSync.record_activity!( + envelope_community, + version_id: latest_s3_sync_version_id + ) end - def delete_from_s3 - SyncEnvelopeGraphWithS3.remove(self) + def schedule_s3_delete + return if envelope_ceterms_ctid.blank? || envelope_community.blank? + + EnvelopeGraphSync.record_activity!( + envelope_community, + version_id: latest_s3_sync_version_id + ) + end + + def latest_s3_sync_version_id + EnvelopeVersion + .where(item_type: self.class.name, item_id: id) + .maximum(:id) end end diff --git a/app/models/envelope_graph_sync.rb b/app/models/envelope_graph_sync.rb new file mode 100644 index 00000000..21847160 --- /dev/null +++ b/app/models/envelope_graph_sync.rb @@ -0,0 +1,141 @@ +# Tracks debounced S3 graph sync scheduling for an envelope community. +require 'sync_envelope_graphs_with_s3_job' + +class EnvelopeGraphSync < ActiveRecord::Base + PUBLISH_LOCKED = 'Publishing is temporarily locked while S3 sync is in progress'.freeze + + belongs_to :envelope_community + + validates :envelope_community_id, uniqueness: true + validates :last_activity_at, presence: true + + class << self + def debounce_window + (ENV['ENVELOPE_GRAPH_SYNC_DEBOUNCE_SECONDS'].presence || '60').to_i.seconds + end + + def sync_lock_timeout + (ENV['ENVELOPE_GRAPH_SYNC_LOCK_TIMEOUT_SECONDS'].presence || '600').to_i.seconds + end + + def syncing?(envelope_community) + sync = find_by(envelope_community: envelope_community) + return false unless sync + + sync.syncing? + end + + def record_activity!(envelope_community, version_id:) + return unless version_id + + sync = find_or_create_by!(envelope_community: envelope_community) do |record| + record.last_activity_at = Time.current + record.last_synced_version_id = previous_version_id(envelope_community, version_id) + end + + wait_until = nil + + sync.with_lock do + now = Time.current + sync.last_activity_at = now + sync.last_activity_version_id = [ + sync.last_activity_version_id, + version_id + ].compact.max + + if sync.scheduled_for_at.blank? || sync.scheduled_for_at <= now + wait_until = now + debounce_window + sync.scheduled_for_at = wait_until + end + + sync.save! + end + + if wait_until + SyncEnvelopeGraphsWithS3Job.set(wait_until: wait_until).perform_later(sync.id) + end + + sync + end + + private + + def previous_version_id(envelope_community, version_id) + EnvelopeVersion + .where(item_type: 'Envelope', envelope_community_id: envelope_community.id) + .where('id < ?', version_id) + .maximum(:id) + end + end + + def syncing? + return false unless syncing + + clear_stale_sync! if stale_sync? + + syncing + end + + def mark_syncing! + with_lock do + clear_stale_sync! if stale_sync? + return false if syncing + + update!( + syncing: true, + syncing_started_at: Time.current, + last_sync_error: nil + ) + end + + true + end + + def clear_syncing! + update!( + syncing: false, + syncing_started_at: nil, + last_sync_finished_at: Time.current + ) + end + + def mark_sync_error!(error) + update!(last_sync_error: "#{error.class}: #{error.message}") + end + + def mark_synced_through!(version_id) + return unless version_id + + with_lock do + update!( + last_synced_version_id: [ + last_synced_version_id, + version_id + ].compact.max + ) + end + end + + def stale_sync? + syncing && syncing_started_at.present? && syncing_started_at <= self.class.sync_lock_timeout.ago + end + + def clear_stale_sync! + return unless stale_sync? + + MR.log_with_labels( + :warn, + 'Clearing stale envelope graph sync lock', + envelope_community: envelope_community.name, + syncing_started_at: syncing_started_at.iso8601, + timeout_seconds: self.class.sync_lock_timeout.to_i + ) + + update!( + syncing: false, + syncing_started_at: nil, + last_sync_finished_at: Time.current, + last_sync_error: 'Stale sync lock cleared after timeout' + ) + end +end diff --git a/app/services/submit_envelope_download_workflow.rb b/app/services/submit_envelope_download_workflow.rb index 45d2bcf2..24edd89a 100644 --- a/app/services/submit_envelope_download_workflow.rb +++ b/app/services/submit_envelope_download_workflow.rb @@ -1,6 +1,8 @@ require 'argo_workflows_client' class SubmitEnvelopeDownloadWorkflow + TEMPLATE_NAME = 's3-graphs-zip'.freeze + def self.call(envelope_download:) new(envelope_download).call end @@ -16,7 +18,7 @@ def call return envelope_download if workflow_already_started? workflow = client.submit_workflow( - template_name: ENV.fetch('ARGO_WORKFLOWS_TEMPLATE_NAME'), + template_name: TEMPLATE_NAME, generate_name: "#{community_name.tr('_', '-')}-download-", parameters: ) diff --git a/app/services/submit_partial_graph_index_workflow.rb b/app/services/submit_partial_graph_index_workflow.rb new file mode 100644 index 00000000..1f12c484 --- /dev/null +++ b/app/services/submit_partial_graph_index_workflow.rb @@ -0,0 +1,61 @@ +require 'argo_workflows_client' + +class SubmitPartialGraphIndexWorkflow + TEMPLATE_NAME = 'update-index-graphs-input-file-elasticsearch'.freeze + + def self.call(envelope_community:, manifest_key:) + new(envelope_community: envelope_community, manifest_key: manifest_key).call + end + + attr_reader :envelope_community, :manifest_key + + def initialize(envelope_community:, manifest_key:) + @envelope_community = envelope_community + @manifest_key = manifest_key + end + + def call + workflow = client.submit_workflow( + template_name: TEMPLATE_NAME, + generate_name: "#{community_name.tr('_', '-')}-partial-graph-index-", + parameters: parameters + ) + workflow_name = workflow.dig(:metadata, :name) + raise 'Argo workflow submission did not return a workflow name' if workflow_name.blank? + + workflow + end + + private + + def client + @client ||= ArgoWorkflowsClient.new + end + + def community_name + envelope_community.name + end + + def task_image + ENV.fetch( + 'ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_TASK_IMAGE', + ENV.fetch('ARGO_WORKFLOWS_TASK_IMAGE') + ) + end + + def parameters + { + 'task-image' => task_image, + 'index-name' => ENV.fetch('ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_NAME'), + 'input-bucket' => graphs_bucket, + 'input-file-key' => manifest_key, + 'source-bucket' => graphs_bucket, + 'prefix-path' => '', + 'aws-region' => ENV.fetch('AWS_REGION') + } + end + + def graphs_bucket + ENV.fetch('ENVELOPE_GRAPHS_BUCKET') + end +end diff --git a/app/services/sync_pending_envelope_graphs_with_s3.rb b/app/services/sync_pending_envelope_graphs_with_s3.rb new file mode 100644 index 00000000..d01ceb15 --- /dev/null +++ b/app/services/sync_pending_envelope_graphs_with_s3.rb @@ -0,0 +1,166 @@ +require 'json' +require 'set' +require 'stringio' +require 'zlib' +require 'submit_partial_graph_index_workflow' + +# Syncs pending envelope graph changes to S3 in a single batch and writes a manifest. +class SyncPendingEnvelopeGraphsWithS3 + attr_reader :cutoff_version_id, :envelope_community, :sync + + def initialize(envelope_community:, cutoff_version_id:, sync: nil) + @envelope_community = envelope_community + @cutoff_version_id = cutoff_version_id + @sync = sync + end + + def call + versions = latest_versions + return mark_synced! if versions.empty? + + return mark_synced! unless s3_bucket_name + + actions = versions.filter_map { |version| sync_version(version) } + + submit_partial_graph_index_workflow(upload_manifest(actions)) + mark_synced! + end + + private + + def latest_versions + scope = EnvelopeVersion + .where(item_type: 'Envelope', envelope_community_id: envelope_community.id) + .where.not(envelope_ceterms_ctid: nil) + .where('id <= ?', cutoff_version_id) + + version_id = sync&.last_synced_version_id + scope = scope.where('id > ?', version_id) if version_id + + latest_version_ids = scope + .select('MAX(versions.id)') + .group(:envelope_ceterms_ctid) + + EnvelopeVersion + .where(id: latest_version_ids) + .order(:id) + end + + def sync_version(version) + if version.event == 'destroy' + delete_version(version) + else + upload_version(version) + end + end + + def upload_version(version) + return if superseded_after_cutoff_ctids.include?(version.envelope_ceterms_ctid) + + envelope = Envelope.unscoped.find_by(id: version.item_id) + return delete_version(version) unless envelope + + object = s3_bucket.object(s3_key(version)) + object.put(body: envelope.processed_resource.to_json, content_type: 'application/json') + + s3_url = object.public_url + envelope.update_column(:s3_url, s3_url) + + action_payload(version, action: 'upload', s3_url: s3_url) + end + + def delete_version(version) + return if superseded_after_cutoff_ctids.include?(version.envelope_ceterms_ctid) + + s3_bucket.object(s3_key(version)).delete + + action_payload(version, action: 'delete') + end + + def superseded_after_cutoff_ctids + @superseded_after_cutoff_ctids ||= EnvelopeVersion + .where(item_type: 'Envelope', + envelope_community_id: envelope_community.id) + .where.not(envelope_ceterms_ctid: nil) + .where('id > ?', cutoff_version_id) + .distinct + .pluck(:envelope_ceterms_ctid) + .to_set + end + + def action_payload(version, action:, s3_url: nil) + { + envelope_ceterms_ctid: version.envelope_ceterms_ctid, + action: action, + s3_key: s3_key(version), + s3_url: s3_url, + updated_at: version.created_at.iso8601 + }.compact + end + + def mark_synced! + sync&.mark_synced_through!(cutoff_version_id) + end + + def upload_manifest(actions) + manifest_items = actions.filter_map { |action| action[:s3_key] if action[:action] == 'upload' } + return if manifest_items.empty? + + body = gzip_json({ items: manifest_items }) + key = manifest_key + + [key, latest_manifest_key].each do |manifest_object_key| + s3_bucket.object(manifest_object_key).put( + body: body, + content_encoding: 'gzip', + content_type: 'application/json' + ) + end + + key + end + + def manifest_key + timestamp = Time.current.utc.iso8601(6).tr(':', '-') + "#{envelope_community.name}/manifests/partial-graphs/#{timestamp}.json.gz" + end + + def latest_manifest_key + "#{envelope_community.name}/manifests/partial-graphs/latest.json.gz" + end + + def submit_partial_graph_index_workflow(manifest_key) + return if manifest_key.blank? + + SubmitPartialGraphIndexWorkflow.call( + envelope_community: envelope_community, + manifest_key: manifest_key + ) + end + + def gzip_json(payload) + io = StringIO.new + + Zlib::GzipWriter.wrap(io) do |gzip| + gzip.write(JSON.generate(payload)) + end + + io.string + end + + def s3_key(version) + "#{envelope_community.name}/#{version.envelope_ceterms_ctid}.json" + end + + def s3_bucket + @s3_bucket ||= s3_resource.bucket(s3_bucket_name) + end + + def s3_bucket_name + ENV['ENVELOPE_GRAPHS_BUCKET'].presence + end + + def s3_resource + @s3_resource ||= Aws::S3::Resource.new(region: ENV['AWS_REGION'].presence) + end +end diff --git a/db/migrate/20260403120000_create_envelope_graph_syncs.rb b/db/migrate/20260403120000_create_envelope_graph_syncs.rb new file mode 100644 index 00000000..609a1526 --- /dev/null +++ b/db/migrate/20260403120000_create_envelope_graph_syncs.rb @@ -0,0 +1,16 @@ +class CreateEnvelopeGraphSyncs < ActiveRecord::Migration[7.1] + def change + create_table :envelope_graph_syncs do |t| + t.references :envelope_community, null: false, foreign_key: true, index: { unique: true } + t.datetime :last_activity_at, null: false + t.datetime :scheduled_for_at + t.boolean :syncing, null: false, default: false + t.datetime :syncing_started_at + t.datetime :last_sync_finished_at + t.references :last_activity_version, foreign_key: { to_table: :versions } + t.references :last_synced_version, foreign_key: { to_table: :versions } + t.text :last_sync_error + t.timestamps null: false + end + end +end diff --git a/db/structure.sql b/db/structure.sql index c0e20716..26652c6d 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -1,4 +1,4 @@ -\restrict 0dy55iEgxshkrNexCflHQqeKePQeKS9cgNddI1yhNU2MlrkmlYja0aF7oIYJK1j +\restrict Q0u15vCqf4lCv5vgbMxeyxBV8Ct49eC5gxSNuzDZrJ1pNvEAgOw4ETPCYfOsztT -- Dumped from database version 16.13 (Debian 16.13-1.pgdg13+1) -- Dumped by pg_dump version 16.13 @@ -339,6 +339,45 @@ CREATE TABLE public.envelope_downloads ( ); +-- +-- Name: envelope_graph_syncs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.envelope_graph_syncs ( + id bigint NOT NULL, + envelope_community_id bigint NOT NULL, + last_activity_at timestamp(6) without time zone NOT NULL, + scheduled_for_at timestamp(6) without time zone, + syncing boolean DEFAULT false NOT NULL, + syncing_started_at timestamp(6) without time zone, + last_sync_finished_at timestamp(6) without time zone, + last_activity_version_id bigint, + last_synced_version_id bigint, + last_sync_error text, + created_at timestamp(6) without time zone NOT NULL, + updated_at timestamp(6) without time zone NOT NULL +); + + +-- +-- Name: envelope_graph_syncs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.envelope_graph_syncs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: envelope_graph_syncs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.envelope_graph_syncs_id_seq OWNED BY public.envelope_graph_syncs.id; + + -- -- Name: envelope_resources; Type: TABLE; Schema: public; Owner: - -- @@ -880,6 +919,13 @@ ALTER TABLE ONLY public.envelope_communities ALTER COLUMN id SET DEFAULT nextval ALTER TABLE ONLY public.envelope_community_configs ALTER COLUMN id SET DEFAULT nextval('public.envelope_community_configs_id_seq'::regclass); +-- +-- Name: envelope_graph_syncs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.envelope_graph_syncs ALTER COLUMN id SET DEFAULT nextval('public.envelope_graph_syncs_id_seq'::regclass); + + -- -- Name: envelope_resources id; Type: DEFAULT; Schema: public; Owner: - -- @@ -1028,6 +1074,14 @@ ALTER TABLE ONLY public.envelope_downloads ADD CONSTRAINT envelope_downloads_pkey PRIMARY KEY (id); +-- +-- Name: envelope_graph_syncs envelope_graph_syncs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.envelope_graph_syncs + ADD CONSTRAINT envelope_graph_syncs_pkey PRIMARY KEY (id); + + -- -- Name: envelope_resources envelope_resources_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1512,6 +1566,27 @@ CREATE INDEX index_envelope_community_configs_on_envelope_community_id ON public CREATE UNIQUE INDEX index_envelope_downloads_on_envelope_community_id ON public.envelope_downloads USING btree (envelope_community_id); +-- +-- Name: index_envelope_graph_syncs_on_envelope_community_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX index_envelope_graph_syncs_on_envelope_community_id ON public.envelope_graph_syncs USING btree (envelope_community_id); + + +-- +-- Name: index_envelope_graph_syncs_on_last_activity_version_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX index_envelope_graph_syncs_on_last_activity_version_id ON public.envelope_graph_syncs USING btree (last_activity_version_id); + + +-- +-- Name: index_envelope_graph_syncs_on_last_synced_version_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX index_envelope_graph_syncs_on_last_synced_version_id ON public.envelope_graph_syncs USING btree (last_synced_version_id); + + -- -- Name: index_envelope_resources_on_created_at; Type: INDEX; Schema: public; Owner: - -- @@ -1910,6 +1985,22 @@ ALTER TABLE ONLY public.envelope_community_configs ADD CONSTRAINT fk_rails_27f72c55da FOREIGN KEY (envelope_community_id) REFERENCES public.envelope_communities(id); +-- +-- Name: envelope_graph_syncs fk_rails_340ad8af6c; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.envelope_graph_syncs + ADD CONSTRAINT fk_rails_340ad8af6c FOREIGN KEY (last_activity_version_id) REFERENCES public.versions(id); + + +-- +-- Name: envelope_graph_syncs fk_rails_367da88e56; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.envelope_graph_syncs + ADD CONSTRAINT fk_rails_367da88e56 FOREIGN KEY (envelope_community_id) REFERENCES public.envelope_communities(id); + + -- -- Name: envelopes fk_rails_4833726efb; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -2038,6 +2129,14 @@ ALTER TABLE ONLY public.organization_publishers ADD CONSTRAINT fk_rails_f1e2e64cfa FOREIGN KEY (publisher_id) REFERENCES public.publishers(id) ON DELETE CASCADE; +-- +-- Name: envelope_graph_syncs fk_rails_f33622674f; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.envelope_graph_syncs + ADD CONSTRAINT fk_rails_f33622674f FOREIGN KEY (last_synced_version_id) REFERENCES public.versions(id); + + -- -- Name: envelopes fk_rails_fbac8d1e0a; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -2050,11 +2149,12 @@ ALTER TABLE ONLY public.envelopes -- PostgreSQL database dump complete -- -\unrestrict 0dy55iEgxshkrNexCflHQqeKePQeKS9cgNddI1yhNU2MlrkmlYja0aF7oIYJK1j +\unrestrict Q0u15vCqf4lCv5vgbMxeyxBV8Ct49eC5gxSNuzDZrJ1pNvEAgOw4ETPCYfOsztT SET search_path TO "$user", public; INSERT INTO "schema_migrations" (version) VALUES +('20260403120000'), ('20260319120000'), ('20260310005238'), ('20260306120000'), diff --git a/spec/api/v1/envelopes_spec.rb b/spec/api/v1/envelopes_spec.rb index 189f8528..e67a28c1 100644 --- a/spec/api/v1/envelopes_spec.rb +++ b/spec/api/v1/envelopes_spec.rb @@ -339,9 +339,11 @@ expect_json('enqueued_at', now.as_json) expect_json('status', 'pending') - expect(ActiveJob::Base.queue_adapter.enqueued_jobs.size).to eq(1) + jobs = ActiveJob::Base.queue_adapter.enqueued_jobs + matching_jobs = jobs.select { |job| job.fetch('job_class') == 'DownloadEnvelopesJob' } + expect(matching_jobs.size).to eq(1) - job = ActiveJob::Base.queue_adapter.enqueued_jobs.first + job = matching_jobs.first expect(job.fetch('arguments')).to eq([envelope_download.id]) expect(job.fetch('job_class')).to eq('DownloadEnvelopesJob') end diff --git a/spec/api/v1/publish_lock_spec.rb b/spec/api/v1/publish_lock_spec.rb new file mode 100644 index 00000000..3be38fb0 --- /dev/null +++ b/spec/api/v1/publish_lock_spec.rb @@ -0,0 +1,29 @@ +RSpec.describe API::V1::Publish do + let!(:ce_registry) { create(:envelope_community, name: 'ce_registry') } + let(:organization) { create(:organization) } + let(:user) { create(:user) } + let(:resource_json) do + File.read( + MR.root_path.join('db', 'seeds', 'ce_registry', 'credential.json') + ) + end + + before do + create(:organization_publisher, organization:, publisher: user.publisher) + EnvelopeGraphSync.create!( + envelope_community: ce_registry, + last_activity_at: Time.current, + syncing: true, + syncing_started_at: Time.current + ) + end + + it 'rejects publish while S3 sync is in progress' do + post "/resources/organizations/#{organization._ctid}/documents", + resource_json, + 'Authorization' => "Token #{user.auth_token.value}" + + expect_status(503) + expect_json('errors.0', EnvelopeGraphSync::PUBLISH_LOCKED) + end +end diff --git a/spec/api/v2/publish_spec.rb b/spec/api/v2/publish_spec.rb index 286856a7..8fe7c343 100644 --- a/spec/api/v2/publish_spec.rb +++ b/spec/api/v2/publish_spec.rb @@ -62,6 +62,30 @@ expect(ActiveJob::Base.queue_adapter.enqueued_jobs.first[:job]).to eq PublishEnvelopeJob end end + + context 'when S3 sync is in progress' do # rubocop:todo RSpec/ContextWording, RSpec/MultipleMemoizedHelpers + before do + create(:organization_publisher, organization: organization, publisher: user.publisher) + EnvelopeGraphSync.create!( + envelope_community: ce_registry, + last_activity_at: Time.current, + syncing: true, + syncing_started_at: Time.current + ) + + post "/resources/organizations/#{organization._ctid}/documents", + resource_json, + { + 'Accept-Version' => 'v2', + 'Authorization' => "Token #{user.auth_token.value}" + } + end + + it 'rejects publish with a locked response' do + expect_status(503) + expect_json('errors.0', EnvelopeGraphSync::PUBLISH_LOCKED) + end + end # rubocop:enable RSpec/MultipleMemoizedHelpers end # rubocop:enable RSpec/MultipleMemoizedHelpers diff --git a/spec/jobs/sync_envelope_graphs_with_s3_job_spec.rb b/spec/jobs/sync_envelope_graphs_with_s3_job_spec.rb new file mode 100644 index 00000000..47f3fc16 --- /dev/null +++ b/spec/jobs/sync_envelope_graphs_with_s3_job_spec.rb @@ -0,0 +1,115 @@ +require 'sync_envelope_graphs_with_s3_job' + +RSpec.describe SyncEnvelopeGraphsWithS3Job do + subject(:job) { described_class } + + let(:envelope_community) { create(:envelope_community, name: 'ce_registry') } + let(:sync) do + EnvelopeGraphSync.create!( + envelope_community:, + last_activity_at: last_activity_at, + scheduled_for_at: scheduled_for_at, + last_activity_version_id: version.id + ) + end + let(:scheduled_for_at) { Time.current } + let(:version) do + EnvelopeVersion.create!( + item_type: 'Envelope', + item_id: 1, + event: 'create', + envelope_community_id: envelope_community.id, + envelope_ceterms_ctid: 'ce-123', + created_at: Time.current + ) + end + + describe '#perform' do + context 'when the quiet period has not elapsed' do + let(:last_activity_at) { 30.seconds.ago } + + it 'reschedules the sync job for the end of the debounce window' do + allow(SyncPendingEnvelopeGraphsWithS3).to receive(:new) + + expect do + described_class.new.perform(sync.id) + end.to change { ActiveJob::Base.queue_adapter.enqueued_jobs.size }.by(1) + + queued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.last + expect(queued_job.fetch('job_class')).to eq(described_class.to_s) + expect(queued_job.fetch('arguments')).to eq([sync.id]) + + expect(sync.reload.scheduled_for_at.to_i).to eq((last_activity_at + 60.seconds).to_i) + expect(SyncPendingEnvelopeGraphsWithS3).not_to have_received(:new) + end + end + + context 'when the quiet period has elapsed' do + let(:last_activity_at) { 61.seconds.ago } + let(:service) { instance_double(SyncPendingEnvelopeGraphsWithS3, call: true) } + + it 'flushes the pending S3 sync batch' do + allow(SyncPendingEnvelopeGraphsWithS3).to receive(:new).and_return(service) + + job.perform_now(sync.id) + + expect(SyncPendingEnvelopeGraphsWithS3).to have_received(:new).with( + envelope_community:, + cutoff_version_id: version.id, + sync: + ) + expect(service).to have_received(:call) + expect(sync.reload.scheduled_for_at).to be_nil + expect(sync.reload.syncing).to be(false) + expect(sync.reload.syncing_started_at).to be_nil + end + end + + context 'when the batch sync raises' do + let(:last_activity_at) { 61.seconds.ago } + let(:service) { instance_double(SyncPendingEnvelopeGraphsWithS3) } + + it 'records the error and clears the lock' do + allow(SyncPendingEnvelopeGraphsWithS3).to receive(:new).and_return(service) + allow(service).to receive(:call).and_raise(StandardError, 'boom') + + expect do + job.perform_now(sync.id) + end.to raise_error(StandardError, 'boom') + + sync.reload + expect(sync.syncing).to be(false) + expect(sync.syncing_started_at).to be_nil + expect(sync.last_sync_error).to eq('StandardError: boom') + expect(sync.last_sync_finished_at).not_to be_nil + end + end + + context 'when a retry runs after a failed flush cleared scheduled_for_at' do + let(:last_activity_at) { 61.seconds.ago } + let(:service) { instance_double(SyncPendingEnvelopeGraphsWithS3) } + + it 'still performs the pending sync' do + allow(SyncPendingEnvelopeGraphsWithS3).to receive(:new).and_return(service) + allow(service).to receive(:call).and_raise(StandardError, 'boom').once + + expect do + job.perform_now(sync.id) + end.to raise_error(StandardError, 'boom') + + sync.reload + expect(sync.scheduled_for_at).to be_nil + expect(sync.last_synced_version_id).to be_nil + + allow(service).to receive(:call).and_return(true) + + expect do + job.perform_now(sync.id) + end.not_to raise_error + + expect(SyncPendingEnvelopeGraphsWithS3).to have_received(:new).twice + expect(service).to have_received(:call).twice + end + end + end +end diff --git a/spec/models/envelope_graph_sync_spec.rb b/spec/models/envelope_graph_sync_spec.rb new file mode 100644 index 00000000..8b8844fd --- /dev/null +++ b/spec/models/envelope_graph_sync_spec.rb @@ -0,0 +1,89 @@ +require 'envelope_graph_sync' + +RSpec.describe EnvelopeGraphSync, type: :model do + let(:envelope_community) { create(:envelope_community, name: 'ce_registry') } + let(:version) do + EnvelopeVersion.create!( + item_type: 'Envelope', + item_id: 1, + event: 'create', + envelope_community_id: envelope_community.id, + envelope_ceterms_ctid: 'ce-123', + created_at: Time.current + ) + end + + describe '.record_activity!' do + it 'schedules only one debounced job while the current one is still pending' do + freeze_time do + described_class.record_activity!(envelope_community, version_id: version.id) + first_scheduled_for = described_class.find_by!(envelope_community:).scheduled_for_at + travel 30.seconds + expected_activity_at = Time.current + + expect do + described_class.record_activity!(envelope_community, version_id: version.id) + end.not_to change { ActiveJob::Base.queue_adapter.enqueued_jobs.size } + + sync = described_class.find_by!(envelope_community:) + expect(sync.scheduled_for_at.to_i).to eq(first_scheduled_for.to_i) + expect(sync.last_activity_at.to_i).to eq(expected_activity_at.to_i) + expect(sync.last_activity_version_id).to eq(version.id) + end + end + end + + describe '.syncing?' do + it 'returns true for an active sync lock' do + described_class.create!( + envelope_community:, + last_activity_at: Time.current, + syncing: true, + syncing_started_at: Time.current + ) + + expect(described_class.syncing?(envelope_community)).to be(true) + end + + it 'returns false for a stale sync lock' do + described_class.create!( + envelope_community:, + last_activity_at: Time.current, + syncing: true, + syncing_started_at: 16.minutes.ago + ) + + expect(described_class.syncing?(envelope_community)).to be(false) + sync = described_class.find_by!(envelope_community:) + expect(sync.syncing).to be(false) + expect(sync.last_sync_error).to eq('Stale sync lock cleared after timeout') + expect(sync.last_sync_finished_at).not_to be_nil + end + end + + describe '#mark_synced_through!' do + let(:newer_version) do + EnvelopeVersion.create!( + item_type: 'Envelope', + item_id: 2, + event: 'create', + envelope_community_id: envelope_community.id, + envelope_ceterms_ctid: 'ce-456', + created_at: Time.current + ) + end + + it 'does not move the synced version backward' do + older_version_id = version.id + sync = described_class.create!( + envelope_community: envelope_community, + last_activity_at: Time.current, + last_synced_version_id: newer_version.id + ) + + sync.mark_synced_through!(older_version_id) + + expect(sync.reload.last_synced_version_id).to eq(newer_version.id) + end + end +end diff --git a/spec/models/envelope_spec.rb b/spec/models/envelope_spec.rb index aa88b8bb..636eae1c 100644 --- a/spec/models/envelope_spec.rb +++ b/spec/models/envelope_spec.rb @@ -20,6 +20,10 @@ end describe 'callbacks' do + before do + allow(EnvelopeGraphSync).to receive(:record_activity!) + end + it 'generates an envelope id if it does not exist' do envelope = create(:envelope, envelope_id: nil) @@ -83,6 +87,34 @@ end.to change { envelope.reload.last_verified_on }.to(updated_date) end end + + it 'records S3 sync activity with the envelope version after commit' do + envelope = nil + + with_versioning do + envelope = create(:envelope, :from_cer) + end + + expect(EnvelopeGraphSync).to have_received(:record_activity!).with( + envelope.envelope_community, + version_id: envelope.versions.last.id + ) + end + + it 'records S3 sync activity with the destroy version after destroy' do + envelope = nil + + with_versioning do + envelope = create(:envelope, :from_cer) + + expect(EnvelopeGraphSync).to receive(:record_activity!).with( + envelope.envelope_community, + version_id: kind_of(Integer) + ) + + envelope.destroy + end + end end describe 'select_scope' do diff --git a/spec/services/submit_envelope_download_workflow_spec.rb b/spec/services/submit_envelope_download_workflow_spec.rb index aa6a3a53..b86200ee 100644 --- a/spec/services/submit_envelope_download_workflow_spec.rb +++ b/spec/services/submit_envelope_download_workflow_spec.rb @@ -10,7 +10,6 @@ before do allow(ArgoWorkflowsClient).to receive(:new).and_return(client) allow(ENV).to receive(:fetch).and_call_original - allow(ENV).to receive(:fetch).with('ARGO_WORKFLOWS_TEMPLATE_NAME').and_return('s3-graphs-zip') allow(ENV).to receive(:fetch).with('ARGO_WORKFLOWS_TASK_IMAGE').and_return('registry:s3-graphs-zip') allow(ENV).to receive(:fetch).with('ARGO_WORKFLOWS_BATCH_SIZE', '25000').and_return('25000') allow(ENV).to receive(:fetch) diff --git a/spec/services/submit_partial_graph_index_workflow_spec.rb b/spec/services/submit_partial_graph_index_workflow_spec.rb new file mode 100644 index 00000000..0f5f4058 --- /dev/null +++ b/spec/services/submit_partial_graph_index_workflow_spec.rb @@ -0,0 +1,45 @@ +require 'submit_partial_graph_index_workflow' + +require 'spec_helper' + +RSpec.describe SubmitPartialGraphIndexWorkflow do + let(:client) { instance_double(ArgoWorkflowsClient, namespace: 'credreg-staging') } + let(:community) { EnvelopeCommunity.find_or_create_by!(name: 'ce_registry') } + let(:workflow) { { metadata: { name: 'ce-registry-partial-graph-index-abc123' } } } + + before do + allow(ArgoWorkflowsClient).to receive(:new).and_return(client) + allow(ENV).to receive(:fetch).and_call_original + allow(ENV).to receive(:fetch).with('ARGO_WORKFLOWS_TASK_IMAGE').and_return('registry:workflow-tasks') + allow(ENV).to receive(:fetch) + .with('ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_TASK_IMAGE', 'registry:workflow-tasks') + .and_return('registry:workflow-tasks') + allow(ENV).to receive(:fetch).with('ARGO_WORKFLOWS_PARTIAL_GRAPH_INDEX_NAME').and_return('graphs') + allow(ENV).to receive(:fetch).with('ENVELOPE_GRAPHS_BUCKET').and_return('graphs-bucket') + allow(ENV).to receive(:fetch).with('AWS_REGION').and_return('us-east-1') + end + + it 'submits the partial graph indexing workflow' do + allow(client).to receive(:submit_workflow) + .with( + template_name: 'update-index-graphs-input-file-elasticsearch', + generate_name: 'ce-registry-partial-graph-index-', + parameters: { + 'task-image' => 'registry:workflow-tasks', + 'index-name' => 'graphs', + 'input-bucket' => 'graphs-bucket', + 'input-file-key' => 'ce_registry/manifests/partial-graphs/2026.json.gz', + 'source-bucket' => 'graphs-bucket', + 'prefix-path' => '', + 'aws-region' => 'us-east-1' + } + ).and_return(workflow) + + result = described_class.call( + envelope_community: community, + manifest_key: 'ce_registry/manifests/partial-graphs/2026.json.gz' + ) + + expect(result).to eq(workflow) + end +end diff --git a/spec/services/sync_envelope_graph_with_s3_spec.rb b/spec/services/sync_envelope_graph_with_s3_spec.rb index 2ec8f604..2a81b774 100644 --- a/spec/services/sync_envelope_graph_with_s3_spec.rb +++ b/spec/services/sync_envelope_graph_with_s3_spec.rb @@ -1,13 +1,25 @@ RSpec.describe SyncEnvelopeGraphWithS3 do # rubocop:todo RSpec/MultipleMemoizedHelpers - let(:envelope) { build(:envelope, :from_cer) } + let(:envelope) { create(:envelope, :from_cer) } let(:s3_bucket) { double('s3_bucket') } # rubocop:todo RSpec/VerifiedDoubles let(:s3_bucket_name) { Faker::Lorem.word } let(:s3_object) { double('s3_object') } # rubocop:todo RSpec/VerifiedDoubles let(:s3_region) { 'aws-s3_region-test' } let(:s3_resource) { double('s3_resource') } # rubocop:todo RSpec/VerifiedDoubles let(:s3_url) { Faker::Internet.url } + let(:original_s3_bucket) { ENV['ENVELOPE_GRAPHS_BUCKET'] } + let(:original_aws_region) { ENV['AWS_REGION'] } + + after do + ENV['ENVELOPE_GRAPHS_BUCKET'] = original_s3_bucket + ENV['AWS_REGION'] = original_aws_region + end context 'without bucket' do # rubocop:todo RSpec/MultipleMemoizedHelpers + before do + ENV['ENVELOPE_GRAPHS_BUCKET'] = nil + ENV['AWS_REGION'] = nil + end + describe '.upload' do # rubocop:todo RSpec/MultipleMemoizedHelpers it 'does nothing' do expect { described_class.upload(envelope) }.not_to raise_error @@ -47,32 +59,26 @@ .and_return(s3_object) .at_least(:once) - # rubocop:todo RSpec/MessageSpies - expect(s3_object).to receive(:put).with( # rubocop:todo RSpec/ExpectInHook, RSpec/MessageSpies - # rubocop:enable RSpec/MessageSpies - body: envelope.processed_resource.to_json, - content_type: 'application/json' - ) - - # rubocop:todo RSpec/StubbedMock - # rubocop:todo RSpec/MessageSpies - expect(s3_object).to receive(:public_url).and_return(s3_url) # rubocop:todo RSpec/ExpectInHook, RSpec/MessageSpies, RSpec/StubbedMock - # rubocop:enable RSpec/MessageSpies - # rubocop:enable RSpec/StubbedMock + allow(s3_object).to receive(:put) + allow(s3_object).to receive(:public_url).and_return(s3_url) end describe '.upload' do # rubocop:todo RSpec/MultipleMemoizedHelpers it 'uploads the s3_resource to S3' do - envelope.save! + described_class.upload(envelope) + + expect(s3_object).to have_received(:put).with( + body: envelope.processed_resource.to_json, + content_type: 'application/json' + ) expect(envelope.s3_url).to eq(s3_url) end end describe '.remove' do # rubocop:todo RSpec/MultipleMemoizedHelpers - it 'uploads the s3_resource to S3' do + it 'deletes the s3_resource from S3' do expect(s3_object).to receive(:delete) # rubocop:todo RSpec/MessageSpies - envelope.save! - expect { envelope.destroy }.not_to raise_error + expect { described_class.remove(envelope) }.not_to raise_error end end end diff --git a/spec/services/sync_pending_envelope_graphs_with_s3_spec.rb b/spec/services/sync_pending_envelope_graphs_with_s3_spec.rb new file mode 100644 index 00000000..60fdfdc3 --- /dev/null +++ b/spec/services/sync_pending_envelope_graphs_with_s3_spec.rb @@ -0,0 +1,143 @@ +require 'sync_pending_envelope_graphs_with_s3' + +RSpec.describe SyncPendingEnvelopeGraphsWithS3 do # rubocop:todo RSpec/MultipleMemoizedHelpers + let(:envelope_community) { create(:envelope_community, name: 'ce_registry') } + let(:cutoff_version_id) { @cutoff_version_id } + let(:sync) do + EnvelopeGraphSync.find_or_create_by!(envelope_community:) do |record| + record.last_activity_at = Time.current + record.last_activity_version_id = cutoff_version_id + end + end + let(:service) do + described_class.new( + envelope_community: envelope_community, + cutoff_version_id: cutoff_version_id, + sync: sync + ) + end + let(:s3_bucket) { double('s3_bucket') } # rubocop:todo RSpec/VerifiedDoubles + let(:s3_bucket_name) { Faker::Lorem.word } + let(:s3_resource) { double('s3_resource') } # rubocop:todo RSpec/VerifiedDoubles + let(:upload_object) { double('upload_object') } # rubocop:todo RSpec/VerifiedDoubles + let(:delete_object) { double('delete_object') } # rubocop:todo RSpec/VerifiedDoubles + let(:timestamped_manifest_object) { double('timestamped_manifest_object') } # rubocop:todo RSpec/VerifiedDoubles + let(:latest_manifest_object) { double('latest_manifest_object') } # rubocop:todo RSpec/VerifiedDoubles + let(:s3_url) { Faker::Internet.url } + let(:argo_workflow) { { metadata: { name: 'ce-registry-partial-graph-index-abc123' } } } + let(:upload_envelope) { @upload_envelope } + let(:delete_envelope) { @delete_envelope } + + before do + with_versioning do + @upload_envelope = create(:envelope, :from_cer, envelope_community: envelope_community) + @delete_envelope = create(:envelope, :from_cer, envelope_community: envelope_community) + @delete_envelope.destroy + end + @cutoff_version_id = EnvelopeVersion.maximum(:id) + + ENV['AWS_REGION'] = 'us-east-1' + ENV['ENVELOPE_GRAPHS_BUCKET'] = s3_bucket_name + + allow(Aws::S3::Resource).to receive(:new).with(region: 'us-east-1').and_return(s3_resource) + allow(s3_resource).to receive(:bucket).with(s3_bucket_name).and_return(s3_bucket) + allow(s3_bucket).to receive(:object).with("#{envelope_community.name}/#{upload_envelope.envelope_ceterms_ctid}.json") + .and_return(upload_object) + allow(s3_bucket).to receive(:object).with("#{envelope_community.name}/#{delete_envelope.envelope_ceterms_ctid}.json") + .and_return(delete_object) + allow(s3_bucket).to receive(:object).with(match(%r{\Ace_registry/manifests/partial-graphs/(?!latest\.json\.gz).+\.json\.gz\z})) + .and_return(timestamped_manifest_object) + allow(s3_bucket).to receive(:object).with('ce_registry/manifests/partial-graphs/latest.json.gz') + .and_return(latest_manifest_object) + + allow(upload_object).to receive(:put) + allow(upload_object).to receive(:public_url).and_return(s3_url) + allow(delete_object).to receive(:delete) + allow(timestamped_manifest_object).to receive(:put) + allow(latest_manifest_object).to receive(:put) + allow(SubmitPartialGraphIndexWorkflow).to receive(:call).and_return(argo_workflow) + end + + it 'syncs pending envelope graph changes and uploads a manifest' do + service.call + + expect(upload_object).to have_received(:put) + expect(delete_object).to have_received(:delete) + expect(timestamped_manifest_object).to have_received(:put) + expect(latest_manifest_object).to have_received(:put) + expect(SubmitPartialGraphIndexWorkflow).to have_received(:call).with( + envelope_community:, + manifest_key: match(%r{\Ace_registry/manifests/partial-graphs/.+\.json\.gz\z}) + ) + expect(upload_envelope.reload.s3_url).to eq(s3_url) + expect(sync.reload.last_synced_version_id).to eq(cutoff_version_id) + end + + it 'writes a gzipped manifest containing uploaded graph keys only' do + service.call + + payload = nil + expect(timestamped_manifest_object).to have_received(:put) do |args| + payload = args.fetch(:body) + expect(args.fetch(:content_encoding)).to eq('gzip') + expect(args.fetch(:content_type)).to eq('application/json') + end + + manifest = Zlib::GzipReader.new(StringIO.new(payload)).read + expect(JSON.parse(manifest)).to eq( + 'items' => ["#{envelope_community.name}/#{upload_envelope.envelope_ceterms_ctid}.json"] + ) + end + + it 'leaves versions newer than the cutoff for the next batch' do + with_versioning do + upload_envelope.update!(envelope_version: '2.0.0') + end + + service.call + + expect(upload_object).not_to have_received(:put) + expect(sync.reload.last_synced_version_id).to eq(cutoff_version_id) + end + + it 'syncs only the latest pending version for each CTID' do + with_versioning do + upload_envelope.update!(envelope_version: '2.0.0') + end + @cutoff_version_id = EnvelopeVersion.maximum(:id) + + service.call + + expect(upload_object).to have_received(:put).once + end + + it 'does not write a manifest or call argo for delete-only batches' do + sync.update!(last_synced_version_id: upload_envelope.versions.last.id) + + service.call + + expect(delete_object).to have_received(:delete) + expect(timestamped_manifest_object).not_to have_received(:put) + expect(latest_manifest_object).not_to have_received(:put) + expect(SubmitPartialGraphIndexWorkflow).not_to have_received(:call) + end + + it 'passes argo the same manifest key that it uploads' do + uploaded_manifest_key = nil + submitted_manifest_key = nil + + allow(s3_bucket).to receive(:object).with(match(%r{\Ace_registry/manifests/partial-graphs/(?!latest\.json\.gz).+\.json\.gz\z})) do |key| + uploaded_manifest_key = key + timestamped_manifest_object + end + + allow(SubmitPartialGraphIndexWorkflow).to receive(:call) do |envelope_community:, manifest_key:| + submitted_manifest_key = manifest_key + argo_workflow + end + + service.call + + expect(uploaded_manifest_key).to eq(submitted_manifest_key) + end +end