Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 9 additions & 60 deletions charts/fetcher/files/rabbitmq/load_definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,17 @@
}
],
"queues": [
{
"name": "fetcher.generate-fetcher.queue",
"vhost": "/",
"durable": true,
"arguments": {
"x-dead-letter-exchange": "fetcher.generate-fetcher.dlx",
"x-dead-letter-routing-key": "fetcher.generate-fetcher.dlq.key"
}
},
{
"name": "fetcher.generate-fetcher.dlq",
"vhost": "/",
"durable": true,
"arguments": {
"x-message-ttl": 604800000,
"x-max-length": 10000
}
},
{
"name": "fetcher.extract-external-data.queue",
"vhost": "/",
"durable": true,
"arguments": {
"x-dead-letter-exchange": "fetcher.extract-external-data.dlx",
"x-dead-letter-routing-key": "fetcher.extract-external-data.dlq.key"
"x-dead-letter-exchange": "fetcher.dlx",
"x-dead-letter-routing-key": "fetcher.dlq.key"
}
},
{
"name": "fetcher.extract-external-data.dlq",
"name": "fetcher.dlq",
"vhost": "/",
"durable": true,
"arguments": {
Expand All @@ -61,58 +43,25 @@
],
"exchanges": [
{
"name": "fetcher.generate-fetcher.exchange",
"name": "fetcher.dlx",
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "fetcher.generate-fetcher.dlx",
"name": "fetcher.job.events",
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "fetcher.extract-external-data.exchange",
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "fetcher.extract-external-data.dlx",
"vhost": "/",
"type": "direct",
"type": "topic",
"durable": true
}
],
"bindings": [
{
"source": "fetcher.generate-fetcher.exchange",
"vhost": "/",
"destination": "fetcher.generate-fetcher.queue",
"destination_type": "queue",
"routing_key": "fetcher.generate-fetcher.key"
},
{
"source": "fetcher.generate-fetcher.dlx",
"vhost": "/",
"destination": "fetcher.generate-fetcher.dlq",
"destination_type": "queue",
"routing_key": "fetcher.generate-fetcher.dlq.key"
},
{
"source": "fetcher.extract-external-data.exchange",
"vhost": "/",
"destination": "fetcher.extract-external-data.queue",
"destination_type": "queue",
"routing_key": "fetcher.extract-external-data.key"
},
{
"source": "fetcher.extract-external-data.dlx",
"source": "fetcher.dlx",
"vhost": "/",
"destination": "fetcher.extract-external-data.dlq",
"destination": "fetcher.dlq",
"destination_type": "queue",
"routing_key": "fetcher.extract-external-data.dlq.key"
"routing_key": "fetcher.dlq.key"
}
]
}
100 changes: 62 additions & 38 deletions charts/fetcher/templates/bootstrap-rabbitmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,73 @@ spec:
echo "API URL: $BASE_URL"
echo ""

echo "Checking if RabbitMQ definitions already exist..."
VHOST="%2F"

# Check if plugin user already exists
PLUGIN_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
"$BASE_URL/api/users/plugin" 2>/dev/null || echo "not_found")
# apply METHOD PATH JSON_BODY
# Idempotent: PUT for resources, POST for bindings. 2xx is success,
# 4xx with precondition_failed means existing resource has different
# arguments (fail loud so drift is not silently masked).
apply() {
local method="$1" path="$2" data="$3"
local http
http=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X "$method" --data "$data" \
"$BASE_URL$path")
if [ "$http" -lt 200 ] || [ "$http" -ge 300 ]; then
echo "Error on $method $path (HTTP $http):"
cat /tmp/response.txt
return 1
fi
echo " $method $path -> $http"
}

# Check if fetcher queue exists
QUEUE_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
"$BASE_URL/api/queues/%2F/fetcher.generate-fetcher.queue" 2>/dev/null || echo "not_found")
echo "Ensuring vhost and user..."
apply PUT "/api/vhosts/$VHOST" '{}'
apply PUT "/api/users/plugin" "{\"password\":\"$RABBITMQ_PLUGIN_PASS\",\"tags\":\"administrator\"}"
apply PUT "/api/permissions/$VHOST/plugin" '{"configure":".*","write":".*","read":".*"}'

if echo "$PLUGIN_EXISTS" | grep -q '"name":"plugin"' && \
echo "$QUEUE_EXISTS" | grep -q '"name":"fetcher.generate-fetcher.queue"'; then
echo "RabbitMQ definitions already applied (user plugin and fetcher queues exist). Skipping."
exit 0
fi
# Manager publishes to the default exchange ("") with routing key
# equal to the queue name, so no application exchange is needed
# for the work queue. Only the shared DLX and the job-events
# topic exchange are declared here.
#
# NOTE: fetcher.job.events is ALSO declared by the Reporter chart
# (charts/reporter/templates/common/bootstrap-rabbitmq.yaml) as a
# defensive self-bootstrap, so that Reporter can be installed and
# become fully operational without depending on Fetcher install
# order. Both charts MUST keep these args identical — divergence
# will cause the second bootstrap to fail with precondition_failed
# at the RabbitMQ Management API. Treat fetcher.job.events as a
# shared topology contract: changes here require a matching change
# in the Reporter chart in the same PR.
echo "Ensuring exchanges..."
apply PUT "/api/exchanges/$VHOST/fetcher.dlx" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}'
apply PUT "/api/exchanges/$VHOST/fetcher.job.events" '{"type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}}'

echo "Applying RabbitMQ definitions from file..."
HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X POST \
--data-binary @/definitions/load_definitions.json \
"$BASE_URL/api/definitions")
if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then
echo "Error applying definitions (HTTP $HTTP_CODE):"
cat /tmp/response.txt
exit 1
fi
echo "Done."
echo "Ensuring queues..."
apply PUT "/api/queues/$VHOST/fetcher.extract-external-data.queue" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"fetcher.dlx","x-dead-letter-routing-key":"fetcher.dlq.key"}}'
apply PUT "/api/queues/$VHOST/fetcher.dlq" '{"durable":true,"auto_delete":false,"arguments":{"x-message-ttl":604800000,"x-max-length":10000}}'

echo "Updating RabbitMQ user: plugin..."
HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X PUT \
--data "{\"password\":\"$RABBITMQ_PLUGIN_PASS\",\"tags\":\"administrator\"}" \
"$BASE_URL/api/users/plugin")
if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then
echo "Error updating plugin user (HTTP $HTTP_CODE):"
cat /tmp/response.txt
exit 1
fi
echo "Done."
# Bindings via POST. RabbitMQ accepts duplicate POSTs to the same
# binding by creating a separate binding entry; to keep idempotent
# behavior, check first and only create when absent.
ensure_binding() {
local exchange="$1" queue="$2" routing_key="$3"
local found
found=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
"$BASE_URL/api/bindings/$VHOST/e/$exchange/q/$queue" \
| grep -c "\"routing_key\":\"$routing_key\"" || true)
if [ "$found" -gt 0 ]; then
echo " binding $exchange -> $queue ($routing_key) already exists"
return 0
fi
apply POST "/api/bindings/$VHOST/e/$exchange/q/$queue" "{\"routing_key\":\"$routing_key\",\"arguments\":{}}"
}

echo "Ensuring bindings..."
ensure_binding fetcher.dlx fetcher.dlq fetcher.dlq.key

echo ""
echo "=== Fetcher RabbitMQ Bootstrap completed successfully ==="
Expand Down
29 changes: 29 additions & 0 deletions charts/reporter/files/rabbitmq/load_definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
"x-message-ttl": 604800000,
"x-max-length": 10000
}
},
{
"name": "reporter.fetcher.job.events",
"vhost": "/",
"durable": true,
"arguments": {
"x-dead-letter-exchange": "reporter.dlx",
"x-dead-letter-routing-key": "reporter.dlq.key"
}
}
],
"exchanges": [
Expand All @@ -53,6 +62,12 @@
"vhost": "/",
"type": "direct",
"durable": true
},
{
"name": "fetcher.job.events",
"vhost": "/",
"type": "topic",
"durable": true
}
],
"bindings": [
Expand All @@ -69,6 +84,20 @@
"destination": "reporter.dlq",
"destination_type": "queue",
"routing_key": "reporter.dlq.key"
},
{
"source": "fetcher.job.events",
"vhost": "/",
"destination": "reporter.fetcher.job.events",
"destination_type": "queue",
"routing_key": "job.completed.reporter"
},
{
"source": "fetcher.job.events",
"vhost": "/",
"destination": "reporter.fetcher.job.events",
"destination_type": "queue",
"routing_key": "job.failed.reporter"
}
]
}
107 changes: 72 additions & 35 deletions charts/reporter/templates/common/bootstrap-rabbitmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,50 +65,87 @@ spec:
BASE_URL="$RABBITMQ_PROTOCOL://$RABBITMQ_HOST:$RABBITMQ_PORT"
fi

echo "=== RabbitMQ Bootstrap ==="
echo "=== Reporter RabbitMQ Bootstrap ==="
echo "API URL: $BASE_URL"
echo ""

echo "Checking if RabbitMQ user already exists..."
VHOST="%2F"

PLUGIN_EXISTS=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
"$BASE_URL/api/users/plugin" 2>/dev/null || echo "not_found")
# apply METHOD PATH JSON_BODY
# Idempotent: PUT for resources, POST for bindings. 2xx is success,
# 4xx with precondition_failed means existing resource has different
# arguments (fail loud so drift is not silently masked).
apply() {
local method="$1" path="$2" data="$3"
local http
http=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X "$method" --data "$data" \
"$BASE_URL$path")
if [ "$http" -lt 200 ] || [ "$http" -ge 300 ]; then
echo "Error on $method $path (HTTP $http):"
cat /tmp/response.txt
return 1
fi
echo " $method $path -> $http"
}

if echo "$PLUGIN_EXISTS" | grep -q '"name":"plugin"'; then
echo "RabbitMQ definitions already applied (user plugin exists). Skipping."
exit 0
fi
echo "Ensuring vhost and user..."
apply PUT "/api/vhosts/$VHOST" '{}'
apply PUT "/api/users/plugin" "{\"password\":\"$RABBITMQ_APP_PASS\",\"tags\":\"administrator\"}"
apply PUT "/api/permissions/$VHOST/plugin" '{"configure":".*","write":".*","read":".*"}'

echo "Applying RabbitMQ definitions from file..."
HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X POST \
--data-binary @/definitions/load_definitions.json \
"$BASE_URL/api/definitions")
if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then
echo "Error applying definitions (HTTP $HTTP_CODE):"
cat /tmp/response.txt
exit 1
fi
echo "Done."
# Reporter declares fetcher.job.events defensively. The Fetcher
# chart (charts/fetcher/templates/bootstrap-rabbitmq.yaml) also
# declares this exchange with identical args. PUT with matching
# args is idempotent in RabbitMQ, so whichever chart bootstraps
# first creates it and the other is a no-op. This removes the
# cross-chart ordering requirement — Reporter can be installed
# and become fully operational with all bindings already in
# place, and the Fetcher integration starts working as soon as
# the Fetcher worker comes online (whether seconds, hours or
# days later).
#
# IMPORTANT: fetcher.job.events is a shared topology contract.
# Both charts MUST keep these args identical — divergence will
# cause the second bootstrap to fail with precondition_failed
# at the RabbitMQ Management API. Changes here require a
# matching change in the Fetcher chart in the same PR.
echo "Ensuring exchanges..."
apply PUT "/api/exchanges/$VHOST/reporter.generate-report.exchange" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}'
apply PUT "/api/exchanges/$VHOST/reporter.dlx" '{"type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}}'
apply PUT "/api/exchanges/$VHOST/fetcher.job.events" '{"type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}}'

echo "Updating RabbitMQ user: plugin..."
HTTP_CODE=$(curl -sSk -o /tmp/response.txt -w "%{http_code}" \
-u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
-H "content-type: application/json" \
-X PUT \
--data "{\"password\":\"$RABBITMQ_APP_PASS\",\"tags\":\"administrator\"}" \
"$BASE_URL/api/users/plugin")
if [ "$HTTP_CODE" -lt 200 ] || [ "$HTTP_CODE" -ge 300 ]; then
echo "Error updating plugin user (HTTP $HTTP_CODE):"
cat /tmp/response.txt
exit 1
fi
echo "Done."
echo "Ensuring queues..."
apply PUT "/api/queues/$VHOST/reporter.generate-report.queue" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"reporter.dlx","x-dead-letter-routing-key":"reporter.dlq.key"}}'
apply PUT "/api/queues/$VHOST/reporter.dlq" '{"durable":true,"auto_delete":false,"arguments":{"x-message-ttl":604800000,"x-max-length":10000}}'
apply PUT "/api/queues/$VHOST/reporter.fetcher.job.events" '{"durable":true,"auto_delete":false,"arguments":{"x-dead-letter-exchange":"reporter.dlx","x-dead-letter-routing-key":"reporter.dlq.key"}}'

# Bindings via POST. RabbitMQ accepts duplicate POSTs to the same
# binding by creating a separate binding entry; to keep idempotent
# behavior, check first and only create when absent.
ensure_binding() {
local exchange="$1" queue="$2" routing_key="$3"
local found
found=$(curl -sSk -u "$RABBITMQ_ADMIN_USER:$RABBITMQ_ADMIN_PASS" \
"$BASE_URL/api/bindings/$VHOST/e/$exchange/q/$queue" \
| grep -c "\"routing_key\":\"$routing_key\"" || true)
if [ "$found" -gt 0 ]; then
echo " binding $exchange -> $queue ($routing_key) already exists"
return 0
fi
apply POST "/api/bindings/$VHOST/e/$exchange/q/$queue" "{\"routing_key\":\"$routing_key\",\"arguments\":{}}"
}

echo "Ensuring bindings..."
ensure_binding reporter.generate-report.exchange reporter.generate-report.queue reporter.generate-report.key
ensure_binding reporter.dlx reporter.dlq reporter.dlq.key
ensure_binding fetcher.job.events reporter.fetcher.job.events job.completed.reporter
ensure_binding fetcher.job.events reporter.fetcher.job.events job.failed.reporter

echo ""
echo "=== RabbitMQ Bootstrap completed successfully ==="
echo "=== Reporter RabbitMQ Bootstrap completed successfully ==="
env:
- name: RABBITMQ_PROTOCOL
value: {{ .Values.externalRabbitmqDefinitions.connection.protocol | quote }}
Expand Down
Loading