diff --git a/charts/fetcher/files/rabbitmq/load_definitions.json b/charts/fetcher/files/rabbitmq/load_definitions.json index 5677f75f..0f17a914 100644 --- a/charts/fetcher/files/rabbitmq/load_definitions.json +++ b/charts/fetcher/files/rabbitmq/load_definitions.json @@ -22,35 +22,17 @@ } ], "queues": [ - { - "name": "fetcher.generate-fetcher.queue", - "vhost": "/", - "durable": true, - "arguments": { - "x-dead-letter-exchange": "fetcher.generate-fetcher.dlx", - "x-dead-letter-routing-key": "fetcher.generate-fetcher.dlq.key" - } - }, - { - "name": "fetcher.generate-fetcher.dlq", - "vhost": "/", - "durable": true, - "arguments": { - "x-message-ttl": 604800000, - "x-max-length": 10000 - } - }, { "name": "fetcher.extract-external-data.queue", "vhost": "/", "durable": true, "arguments": { - "x-dead-letter-exchange": "fetcher.extract-external-data.dlx", - "x-dead-letter-routing-key": "fetcher.extract-external-data.dlq.key" + "x-dead-letter-exchange": "fetcher.dlx", + "x-dead-letter-routing-key": "fetcher.dlq.key" } }, { - "name": "fetcher.extract-external-data.dlq", + "name": "fetcher.dlq", "vhost": "/", "durable": true, "arguments": { @@ -61,58 +43,25 @@ ], "exchanges": [ { - "name": "fetcher.generate-fetcher.exchange", + "name": "fetcher.dlx", "vhost": "/", "type": "direct", "durable": true }, { - "name": "fetcher.generate-fetcher.dlx", + "name": "fetcher.job.events", "vhost": "/", - "type": "direct", - "durable": true - }, - { - "name": "fetcher.extract-external-data.exchange", - "vhost": "/", - "type": "direct", - "durable": true - }, - { - "name": "fetcher.extract-external-data.dlx", - "vhost": "/", - "type": "direct", + "type": "topic", "durable": true } ], "bindings": [ { - "source": "fetcher.generate-fetcher.exchange", - "vhost": "/", - "destination": "fetcher.generate-fetcher.queue", - "destination_type": "queue", - "routing_key": "fetcher.generate-fetcher.key" - }, - { - "source": "fetcher.generate-fetcher.dlx", - "vhost": "/", - "destination": "fetcher.generate-fetcher.dlq", - "destination_type": "queue", - "routing_key": "fetcher.generate-fetcher.dlq.key" - }, - { - "source": "fetcher.extract-external-data.exchange", - "vhost": "/", - "destination": "fetcher.extract-external-data.queue", - "destination_type": "queue", - "routing_key": "fetcher.extract-external-data.key" - }, - { - "source": "fetcher.extract-external-data.dlx", + "source": "fetcher.dlx", "vhost": "/", - "destination": "fetcher.extract-external-data.dlq", + "destination": "fetcher.dlq", "destination_type": "queue", - "routing_key": "fetcher.extract-external-data.dlq.key" + "routing_key": "fetcher.dlq.key" } ] } diff --git a/charts/fetcher/templates/bootstrap-rabbitmq.yaml b/charts/fetcher/templates/bootstrap-rabbitmq.yaml index 3d38e723..ffdc63b4 100644 --- a/charts/fetcher/templates/bootstrap-rabbitmq.yaml +++ b/charts/fetcher/templates/bootstrap-rabbitmq.yaml @@ -70,49 +70,73 @@ spec: echo "API URL: $BASE_URL" echo "" - echo "Checking if RabbitMQ definitions already exist..." + VHOST="%2F" - # Check if plugin user already exists - PLUGIN_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - "$BASE_URL/api/users/plugin" 2>/dev/null || echo "not_found") + # apply METHOD PATH JSON_BODY + # Idempotent: PUT for resources, POST for bindings. 2xx is success, + # 4xx with precondition_failed means existing resource has different + # arguments (fail loud so drift is not silently masked). + apply() { + local method="$1" path="$2" data="$3" + local http + http=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ + -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ + -H "content-type: application/json" \ + -X "$method" --data "$data" \ + "$BASE_URL$path") + if [ "$http" -lt 200 ] || [ "$http" -ge 300 ]; then + echo "Error on $method $path (HTTP $http):" + cat /tmp/response.txt + return 1 + fi + echo " $method $path -> $http" + } - # Check if fetcher queue exists - QUEUE_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - "$BASE_URL/api/queues/%2F/fetcher.generate-fetcher.queue" 2>/dev/null || echo "not_found") + echo "Ensuring vhost and user..." + apply PUT "/api/vhosts/$VHOST" '{}' + apply PUT "/api/users/plugin" "{\"password\":\"$RABBITMQ_PLUGIN_PASS\",\"tags\":\"administrator\"}" + apply PUT "/api/permissions/$VHOST/plugin" '{"configure":".*","write":".*","read":".*"}' - if echo "$PLUGIN_EXISTS" | grep -q '"name":"plugin"' && \ - echo "$QUEUE_EXISTS" | grep -q '"name":"fetcher.generate-fetcher.queue"'; then - echo "RabbitMQ definitions already applied (user plugin and fetcher queues exist). Skipping." - exit 0 - fi + # Manager publishes to the default exchange ("") with routing key + # equal to the queue name, so no application exchange is needed + # for the work queue. Only the shared DLX and the job-events + # topic exchange are declared here. + # + # NOTE: fetcher.job.events is ALSO declared by the Reporter chart + # (charts/reporter/templates/common/bootstrap-rabbitmq.yaml) as a + # defensive self-bootstrap, so that Reporter can be installed and + # become fully operational without depending on Fetcher install + # order. Both charts MUST keep these args identical — divergence + # will cause the second bootstrap to fail with precondition_failed + # at the RabbitMQ Management API. Treat fetcher.job.events as a + # shared topology contract: changes here require a matching change + # in the Reporter chart in the same PR. + echo "Ensuring exchanges..." + apply PUT "/api/exchanges/$VHOST/fetcher.dlx" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}' + apply PUT "/api/exchanges/$VHOST/fetcher.job.events" '{"type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}}' - echo "Applying RabbitMQ definitions from file..." - HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ - -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - -H "content-type: application/json" \ - -X POST \ - --data-binary @/definitions/load_definitions.json \ - "$BASE_URL/api/definitions") - if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then - echo "Error applying definitions (HTTP $HTTP_CODE):" - cat /tmp/response.txt - exit 1 - fi - echo "Done." + echo "Ensuring queues..." + apply PUT "/api/queues/$VHOST/fetcher.extract-external-data.queue" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"fetcher.dlx","x-dead-letter-routing-key":"fetcher.dlq.key"}}' + apply PUT "/api/queues/$VHOST/fetcher.dlq" '{"durable":true,"auto_delete":false,"arguments":{"x-message-ttl":604800000,"x-max-length":10000}}' - echo "Updating RabbitMQ user: plugin..." - HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ - -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - -H "content-type: application/json" \ - -X PUT \ - --data "{\"password\":\"$RABBITMQ_PLUGIN_PASS\",\"tags\":\"administrator\"}" \ - "$BASE_URL/api/users/plugin") - if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then - echo "Error updating plugin user (HTTP $HTTP_CODE):" - cat /tmp/response.txt - exit 1 - fi - echo "Done." + # Bindings via POST. RabbitMQ accepts duplicate POSTs to the same + # binding by creating a separate binding entry; to keep idempotent + # behavior, check first and only create when absent. + ensure_binding() { + local exchange="$1" queue="$2" routing_key="$3" + local found + found=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ + "$BASE_URL/api/bindings/$VHOST/e/$exchange/q/$queue" \ + | grep -c "\"routing_key\":\"$routing_key\"" || true) + if [ "$found" -gt 0 ]; then + echo " binding $exchange -> $queue ($routing_key) already exists" + return 0 + fi + apply POST "/api/bindings/$VHOST/e/$exchange/q/$queue" "{\"routing_key\":\"$routing_key\",\"arguments\":{}}" + } + + echo "Ensuring bindings..." + ensure_binding fetcher.dlx fetcher.dlq fetcher.dlq.key echo "" echo "=== Fetcher RabbitMQ Bootstrap completed successfully ===" diff --git a/charts/reporter/files/rabbitmq/load_definitions.json b/charts/reporter/files/rabbitmq/load_definitions.json index fc1ed29c..d0251018 100644 --- a/charts/reporter/files/rabbitmq/load_definitions.json +++ b/charts/reporter/files/rabbitmq/load_definitions.json @@ -39,6 +39,15 @@ "x-message-ttl": 604800000, "x-max-length": 10000 } + }, + { + "name": "reporter.fetcher.job.events", + "vhost": "/", + "durable": true, + "arguments": { + "x-dead-letter-exchange": "reporter.dlx", + "x-dead-letter-routing-key": "reporter.dlq.key" + } } ], "exchanges": [ @@ -53,6 +62,12 @@ "vhost": "/", "type": "direct", "durable": true + }, + { + "name": "fetcher.job.events", + "vhost": "/", + "type": "topic", + "durable": true } ], "bindings": [ @@ -69,6 +84,20 @@ "destination": "reporter.dlq", "destination_type": "queue", "routing_key": "reporter.dlq.key" + }, + { + "source": "fetcher.job.events", + "vhost": "/", + "destination": "reporter.fetcher.job.events", + "destination_type": "queue", + "routing_key": "job.completed.reporter" + }, + { + "source": "fetcher.job.events", + "vhost": "/", + "destination": "reporter.fetcher.job.events", + "destination_type": "queue", + "routing_key": "job.failed.reporter" } ] } \ No newline at end of file diff --git a/charts/reporter/templates/common/bootstrap-rabbitmq.yaml b/charts/reporter/templates/common/bootstrap-rabbitmq.yaml index 0cb8afdd..712558c2 100644 --- a/charts/reporter/templates/common/bootstrap-rabbitmq.yaml +++ b/charts/reporter/templates/common/bootstrap-rabbitmq.yaml @@ -65,50 +65,87 @@ spec: BASE_URL="$RABBITMQ_PROTOCOL://$RABBITMQ_HOST:$RABBITMQ_PORT" fi - echo "=== RabbitMQ Bootstrap ===" + echo "=== Reporter RabbitMQ Bootstrap ===" echo "API URL: $BASE_URL" echo "" - echo "Checking if RabbitMQ user already exists..." + VHOST="%2F" - PLUGIN_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - "$BASE_URL/api/users/plugin" 2>/dev/null || echo "not_found") + # apply METHOD PATH JSON_BODY + # Idempotent: PUT for resources, POST for bindings. 2xx is success, + # 4xx with precondition_failed means existing resource has different + # arguments (fail loud so drift is not silently masked). + apply() { + local method="$1" path="$2" data="$3" + local http + http=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ + -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ + -H "content-type: application/json" \ + -X "$method" --data "$data" \ + "$BASE_URL$path") + if [ "$http" -lt 200 ] || [ "$http" -ge 300 ]; then + echo "Error on $method $path (HTTP $http):" + cat /tmp/response.txt + return 1 + fi + echo " $method $path -> $http" + } - if echo "$PLUGIN_EXISTS" | grep -q '"name":"plugin"'; then - echo "RabbitMQ definitions already applied (user plugin exists). Skipping." - exit 0 - fi + echo "Ensuring vhost and user..." + apply PUT "/api/vhosts/$VHOST" '{}' + apply PUT "/api/users/plugin" "{\"password\":\"$RABBITMQ_APP_PASS\",\"tags\":\"administrator\"}" + apply PUT "/api/permissions/$VHOST/plugin" '{"configure":".*","write":".*","read":".*"}' - echo "Applying RabbitMQ definitions from file..." - HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ - -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - -H "content-type: application/json" \ - -X POST \ - --data-binary @/definitions/load_definitions.json \ - "$BASE_URL/api/definitions") - if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then - echo "Error applying definitions (HTTP $HTTP_CODE):" - cat /tmp/response.txt - exit 1 - fi - echo "Done." + # Reporter declares fetcher.job.events defensively. The Fetcher + # chart (charts/fetcher/templates/bootstrap-rabbitmq.yaml) also + # declares this exchange with identical args. PUT with matching + # args is idempotent in RabbitMQ, so whichever chart bootstraps + # first creates it and the other is a no-op. This removes the + # cross-chart ordering requirement — Reporter can be installed + # and become fully operational with all bindings already in + # place, and the Fetcher integration starts working as soon as + # the Fetcher worker comes online (whether seconds, hours or + # days later). + # + # IMPORTANT: fetcher.job.events is a shared topology contract. + # Both charts MUST keep these args identical — divergence will + # cause the second bootstrap to fail with precondition_failed + # at the RabbitMQ Management API. Changes here require a + # matching change in the Fetcher chart in the same PR. + echo "Ensuring exchanges..." + apply PUT "/api/exchanges/$VHOST/reporter.generate-report.exchange" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}' + apply PUT "/api/exchanges/$VHOST/reporter.dlx" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}' + apply PUT "/api/exchanges/$VHOST/fetcher.job.events" '{"type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}}' - echo "Updating RabbitMQ user: plugin..." - HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \ - -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ - -H "content-type: application/json" \ - -X PUT \ - --data "{\"password\":\"$RABBITMQ_APP_PASS\",\"tags\":\"administrator\"}" \ - "$BASE_URL/api/users/plugin") - if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then - echo "Error updating plugin user (HTTP $HTTP_CODE):" - cat /tmp/response.txt - exit 1 - fi - echo "Done." + echo "Ensuring queues..." + apply PUT "/api/queues/$VHOST/reporter.generate-report.queue" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"reporter.dlx","x-dead-letter-routing-key":"reporter.dlq.key"}}' + apply PUT "/api/queues/$VHOST/reporter.dlq" '{"durable":true,"auto_delete":false,"arguments":{"x-message-ttl":604800000,"x-max-length":10000}}' + apply PUT "/api/queues/$VHOST/reporter.fetcher.job.events" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"reporter.dlx","x-dead-letter-routing-key":"reporter.dlq.key"}}' + + # Bindings via POST. RabbitMQ accepts duplicate POSTs to the same + # binding by creating a separate binding entry; to keep idempotent + # behavior, check first and only create when absent. + ensure_binding() { + local exchange="$1" queue="$2" routing_key="$3" + local found + found=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \ + "$BASE_URL/api/bindings/$VHOST/e/$exchange/q/$queue" \ + | grep -c "\"routing_key\":\"$routing_key\"" || true) + if [ "$found" -gt 0 ]; then + echo " binding $exchange -> $queue ($routing_key) already exists" + return 0 + fi + apply POST "/api/bindings/$VHOST/e/$exchange/q/$queue" "{\"routing_key\":\"$routing_key\",\"arguments\":{}}" + } + + echo "Ensuring bindings..." + ensure_binding reporter.generate-report.exchange reporter.generate-report.queue reporter.generate-report.key + ensure_binding reporter.dlx reporter.dlq reporter.dlq.key + ensure_binding fetcher.job.events reporter.fetcher.job.events job.completed.reporter + ensure_binding fetcher.job.events reporter.fetcher.job.events job.failed.reporter echo "" - echo "=== RabbitMQ Bootstrap completed successfully ===" + echo "=== Reporter RabbitMQ Bootstrap completed successfully ===" env: - name: RABBITMQ_PROTOCOL value: {{ .Values.externalRabbitmqDefinitions.connection.protocol | quote }}