A FastAPI service to trigger Dagster jobs via REST API, designed for integration with change detection services.
This service provides secure REST endpoints to trigger Dagster jobs via GraphQL API, primarily used with changedetection.io to update databases when external data sources change.
graph LR
A[changedetection.io] -->|Webhook| B[Trigger Service]
B -->|GraphQL| C[Dagster]
C -->|Execute| D[Job]
- π Secure job triggering with API key authentication
- π₯ Health check endpoint
- β‘ Async job execution
- π‘οΈ Comprehensive error handling and logging
- π Integration with changedetection.io
DAGSTER_HOST=10.10.10.34
DAGSTER_PORT=3000
DAGSTER_TIMEOUT_SECONDS=30
REPOSITORY_LOCATION=dlt_pipelines # Dagster repository location
REPOSITORY_NAME=__repository__ # Dagster repository name
API_KEY=your_secret_key # API key for authentication
flowchart TD
A[Client] -->|API Request| B[FastAPI Service]
B -->|Authenticate| C{API Key Valid?}
C -->|Yes| D[Trigger Job]
C -->|No| E[401 Unauthorized]
D -->|GraphQL| F[Dagster]
F -->|Execute| G[Job Running]
G -->|Complete| H[Return Status]
- Edit
trigger_service/config.py
and add your job toAVAILABLE_JOBS
:
AVAILABLE_JOBS = {
"countries_job": {
"description": "Update countries database from external source",
"repository_location": "dlt_pipelines",
"repository_name": "__repository__",
},
"your_new_job": {
"description": "Description of what this job does",
"repository_location": "custom_location", # Optional
"repository_name": "custom_repo", # Optional
},
}
# Build the image
docker build -t dagster-trigger .
# Run the container
docker run -d \
--name dagster-trigger \
-p 8000:8000 \
-e DAGSTER_HOST=your_dagster_host \
-e DAGSTER_PORT=your_dagster_port \
-e API_KEY=your_secret_key \
dagster-trigger
# Create virtual environment
python -m venv venv
source venv/bin/activate # or `venv\Scripts\activate` on Windows
# Install dependencies
pip install -r requirements.txt
# Run the service
uvicorn trigger_service.trigger:app --host 0.0.0.0 --port 8000
Method | Description | Example |
---|---|---|
Header | Use X-API-Key header | X-API-Key: your_api_key |
URL Param | Include in query string | ?api_key=your_api_key |
curl http://localhost:8000/health
curl -X POST http://localhost:8000/trigger/countries_job -H "X-API-Key: your_api_key"
# or
curl -X POST "http://localhost:8000/trigger/countries_job?api_key=your_api_key"
- Set up changedetection.io to monitor your data source
- Configure a webhook notification:
- URL:
http://your-service:8000/trigger/your_job_name?api_key=your_api_key
- Method: POST
- Content Type: application/json
- URL:
Example changedetection.io configurations:
url: https://api.example.com/data
notification_urls:
- http://localhost:8000/trigger/countries_job?api_key=your_api_key
url: https://example.com/data-page
notification_urls:
- http://localhost:8000/trigger/countries_job?api_key=your_api_key
The service uses Dagster's GraphQL API to trigger jobs. Here's how to find the correct parameters for your setup:
Use this query to find your repository location and name:
query RepositoriesQuery {
repositoriesOrError {
... on RepositoryConnection {
nodes {
name
location {
name
}
}
}
}
}
To see existing runs and their configuration:
query PaginatedPipelineRuns {
pipelineRunsOrError {
__typename
... on PipelineRuns {
results {
runId
pipelineName
status
runConfigYaml
repositoryOrigin {
repositoryLocationName
repositoryName
__typename
id
}
stats {
... on PipelineRunStatsSnapshot {
startTime
endTime
stepsFailed
}
}
}
}
}
}
The service uses this mutation to trigger jobs:
mutation LaunchRunMutation(
$repositoryLocationName: String!
$repositoryName: String!
$jobName: String!
$runConfigData: RunConfigData!
$tags: [ExecutionTag!]
) {
launchRun(
executionParams: {
selector: {
repositoryLocationName: $repositoryLocationName
repositoryName: $repositoryName
jobName: $jobName
}
runConfigData: $runConfigData
executionMetadata: { tags: $tags }
}
) {
__typename
... on LaunchRunSuccess {
run {
runId
}
}
... on RunConfigValidationInvalid {
errors {
message
reason
}
}
... on PythonError {
message
}
}
}
Example variables for launching a job:
{
"runConfigData": "{}",
"repositoryLocationName": "dlt_pipelines",
"repositoryName": "__repository__",
"jobName": "countries_job",
"pipelineName": "countries_job"
}
trigger_service/
βββ __init__.py
βββ config.py # Configuration settings
βββ trigger.py # Main FastAPI application
βββ utils.py # Optional utilities
- Update configuration in
config.py
- Add new endpoints in
trigger.py
- Update tests if applicable
- Update documentation
# Run tests
pytest
# Check code style
flake8 trigger_service
Docker logs can be viewed with:
docker logs -f dagster-trigger
Access FastAPI metrics at /metrics
endpoint (if enabled).
Common issues and solutions:
Issue | Solution |
---|---|
π Connection Refused | Check if Dagster is running and verify DAGSTER_HOST/PORT |
π Job Not Found | Verify job name and repository configuration |
β±οΈ Timeout Issues | Adjust DAGSTER_TIMEOUT_SECONDS |
π Authentication Issues | Verify API_KEY configuration |
- Stack Overflow: Repository Location Name in Dagster
- GitHub Discussion: Launching Dagster Jobs via GraphQL
- Medium Article: Dagster GraphQL Integration
- Use HTTPS in production
- Configure CORS appropriately
- Use environment variables for sensitive data
- Protect API keys and sensitive configuration
MIT License
- Fork the repository
- Create a feature branch
- Submit a pull request
Create an issue in the repository for:
- π Bug reports
- β¨ Feature requests
- β General questions
Made with β€οΈ by ali2kan