Skip to content

Quantiphi-INC/oracle-node

 
 

Repository files navigation

Elephant Express: Simple Usage

This repo deploys an AWS Step Functions (Express) workflow with SQS and Lambda. Follow these steps to get it running quickly.

1) Set environment variables

The oracle node supports two authentication modes:

Option A: Traditional Mode (using API credentials)

Export these environment variables before deploying:

# Required for traditional mode
export ELEPHANT_DOMAIN=...
export ELEPHANT_API_KEY=...
export ELEPHANT_ORACLE_KEY_ID=...
export ELEPHANT_FROM_ADDRESS=...
export ELEPHANT_RPC_URL=...
export ELEPHANT_PINATA_JWT=...

# Optional (deployment)
export STACK_NAME=elephant-oracle-node
export WORKFLOW_QUEUE_NAME=elephant-workflow-queue
export WORKFLOW_STARTER_RESERVED_CONCURRENCY=100
export WORKFLOW_STATE_MACHINE_NAME=ElephantExpressWorkflow

# Optional (Transform scripts upload)
# Default: false (transform scripts are NOT uploaded unless explicitly enabled)
export UPLOAD_TRANSFORMS=true  # Set to 'true' to upload transform scripts from transform/ directory to S3
# You can set TRANSFORM_S3_PREFIX_VALUE manually if scripts already exist in S3 and you want to skip upload.

# Optional (AWS CLI)
export AWS_PROFILE=your-profile
export AWS_REGION=your-region

# Optional (Prepare function flags - only set to 'true' if needed)
export ELEPHANT_PREPARE_USE_BROWSER=false  # Force browser mode
export ELEPHANT_PREPARE_NO_FAST=false      # Disable fast mode
export ELEPHANT_PREPARE_NO_CONTINUE=false  # Disable continue mode
export ELEPHANT_PREPARE_IGNORE_CAPTCHA=false  # Ignore captcha challenges

# Optional (Continue button selector)
export ELEPHANT_PREPARE_CONTINUE_BUTTON=""  # CSS selector for continue button

# Optional (Browser flow template - both must be provided together)
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE=""    # Browser flow template name
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS=""  # Browser flow parameters as JSON string

# Optional (Updater schedule - only set if you want to change from default)
export UPDATER_SCHEDULE_RATE="1 minute"    # How often updater runs (default: "1 minute")
# For sub-minute intervals, use cron expressions:
# export UPDATER_SCHEDULE_RATE="cron(*/1 * * * ? *)"  # Every minute
# export UPDATER_SCHEDULE_RATE="cron(0/30 * * * ? *)" # Every 30 seconds (at :00 and :30)

# Optional (Proxy rotation - for automatic proxy rotation support)
export PROXY_FILE=/path/to/proxies.txt  # File containing proxy URLs (one per line: username:password@ip:port)

# Optional (Per-county repair flags written to SSM)
# Set one county at a time
#   REPAIR_COUNTY: County name as it appears in county_jurisdiction (spaces are OK)
#   REPAIR_VALUE:  true|false
export REPAIR_COUNTY="Palm Beach"
export REPAIR_VALUE=true

# Or set multiple counties at once using JSON object of name->boolean
export REPAIR_COUNTIES_JSON='{"Palm Beach": true, "Escambia": false}'

export GITHUB_TOKEN=ghp_xxxxxxxxxxxx  # Your GitHub personal access token (requires 'repo' scope)

Option B: Keystore Mode (using encrypted private key)

For non-institutional oracles, you can use your own wallter as a keystore file (encrypted private key) instead of API credentials. The keystore file follows the EIP-2335 standard for BLS12-381 key encryption.

To create a keystore file, see the Elephant CLI documentation on encrypted JSON keystores

# Required for keystore mode
export ELEPHANT_KEYSTORE_FILE=/path/to/your/keystore.json  # Path to your keystore JSON file
export ELEPHANT_KEYSTORE_PASSWORD=your-keystore-password   # Password to decrypt the keystore
export ELEPHANT_RPC_URL=...                                # RPC URL for blockchain submission
export ELEPHANT_PINATA_JWT=...                             # Pinata JWT for uploads

# Optional (deployment)
export STACK_NAME=elephant-oracle-node
export WORKFLOW_QUEUE_NAME=elephant-workflow-queue
export WORKFLOW_STARTER_RESERVED_CONCURRENCY=100
export WORKFLOW_STATE_MACHINE_NAME=ElephantExpressWorkflow

# Optional (Transform scripts upload)
# Default: false (transform scripts are NOT uploaded unless explicitly enabled)
export UPLOAD_TRANSFORMS=true  # Set to 'true' to upload transform scripts from transform/ directory to S3
# You can set TRANSFORM_S3_PREFIX_VALUE manually if scripts already exist in S3 and you want to skip upload.

# Optional (AWS CLI)
export AWS_PROFILE=your-profile
export AWS_REGION=your-region

# Optional (Prepare function flags - only set to 'true' if needed)
export ELEPHANT_PREPARE_USE_BROWSER=false  # Force browser mode
export ELEPHANT_PREPARE_NO_FAST=false      # Disable fast mode
export ELEPHANT_PREPARE_NO_CONTINUE=false  # Disable continue mode
export ELEPHANT_PREPARE_IGNORE_CAPTCHA=false  # Ignore captcha challenges

# Optional (Continue button selector)
export ELEPHANT_PREPARE_CONTINUE_BUTTON=""  # CSS selector for continue button

# Optional (Browser flow template - both must be provided together)
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE=""    # Browser flow template name
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS=""  # Browser flow parameters as JSON string

export GITHUB_TOKEN=ghp_xxxxxxxxxxxx  # Your GitHub personal access token (requires 'repo' scope)

Important Notes for Keystore Mode:

  • The keystore file must exist at the specified path
  • The password must be correct to decrypt the keystore
  • The keystore file will be securely uploaded to S3 during deployment
  • When using keystore mode, you don't need to provide: ELEPHANT_DOMAIN, ELEPHANT_API_KEY, ELEPHANT_ORACLE_KEY_ID, or ELEPHANT_FROM_ADDRESS
  • To create a keystore file, see the Elephant CLI documentation on encrypted JSON keystores

Browser Flow Files (Optional):

For complex browser automation scenarios, you can provide county-specific browser flow JSON files:

  1. Add JSON files named <CountyName>.json (e.g., Broward.json, Miami-Dade.json) to the browser-flows/ directory in the repository root
  2. These files are automatically uploaded to S3 during deployment
  3. The Lambda function will automatically download and use the appropriate flow file based on the county being processed

Example structure:

oracle-node/
├── browser-flows/
│   ├── Broward.json
│   ├── Miami-Dade.json
│   └── Palm-Beach.json
├── transform/
└── ...

The browser flow file is passed to the prepare function as the browserFlowFile parameter and is automatically cleaned up after use.

Multi-Request Flow Files (Optional):

For counties that require multiple API requests to fetch complete data, you can provide county-specific multi-request flow JSON files. For detailed documentation on the multi-request flows feature, see the Elephant CLI Multi-Request Flows documentation.

  1. Add JSON files named <CountyName>.json (e.g., Manatee.json, Collier.json) to the multi-request-flows/ directory in the repository root
  2. These files are automatically uploaded to S3 during deployment
  3. The Lambda function will automatically download and use the appropriate flow file based on the county being processed

Example structure:

oracle-node/
├── browser-flows/
│   ├── Broward.json
│   └── Palm-Beach.json
├── multi-request-flows/
│   ├── Manatee.json
│   └── Collier.json
├── transform/
└── ...

The multi-request flow file is passed to the prepare function as the multiRequestFlowFile parameter and is automatically cleaned up after use.

Static Parts Files (Optional):

For counties where certain HTML elements remain static (unchanging) across multiple requests, you can provide county-specific static parts CSV files. These files help the mirror validation Lambda identify which content to exclude from completeness checks, improving validation accuracy.

Static parts files can be generated using the identify-static-parts command from the Elephant CLI. For detailed documentation on generating and using static parts files, see the Elephant CLI Static Parts documentation.

  1. Add CSV files named <county>.csv (e.g., collier.csv, palm-beach.csv) to the source-html-static-parts/ directory in the repository root
  2. These files are automatically uploaded to S3 during deployment
  3. The Mirror Validation Lambda will automatically download and use the appropriate file based on the county being processed
  4. If no static parts file exists for a county, mirror validation runs normally without static parts filtering

Example structure:

oracle-node/
├── browser-flows/
│   ├── Broward.json
│   └── Palm-Beach.json
├── multi-request-flows/
│   ├── Manatee.json
│   └── Collier.json
├── source-html-static-parts/
│   ├── collier.csv
│   └── palm-beach.csv
├── transform/
└── ...

CSV Format: The static parts CSV should contain selectors or patterns identifying static HTML elements. Refer to the Elephant CLI documentation for the exact format and usage.

Put your transform files under transform/ (if applicable).

2) Deploy infrastructure

./scripts/deploy-infra.sh

This creates the VPC, S3 buckets, SQS queues, Lambdas, and the Express Step Functions state machine.

Repair mode (per-county) — Skip Prepare when possible

The workflow can skip the Prepare step for specific counties in a "repair" scenario when a prepared output already exists in S3. Control is via SSM Parameters created by the deploy script:

  • SSM parameter path: /${STACK_NAME}/repair/<County_With_Underscores>
  • Value: "true" or "false" (string)

Behavior during execution:

  • Pre Lambda emits county_name and county_key (spaces→underscores).
  • State machine reads /${STACK_NAME}/repair/${county_key}.
  • If value is true, it constructs the expected output path from pre.output_prefix and checks S3 with HeadObject:
    • If the object exists, Prepare is bypassed and the existing output.zip is used.
    • If not, the flow runs Prepare as usual.

Set flags via environment variables before running ./scripts/deploy-infra.sh:

# Single county
export REPAIR_COUNTY="Palm Beach"
export REPAIR_VALUE=true
./scripts/deploy-infra.sh

# Multiple counties
export REPAIR_COUNTIES_JSON='{"Palm Beach": true, "Escambia": false}'
./scripts/deploy-infra.sh

Configure Prepare Function Behavior

The DownloaderFunction uses the prepare command from @elephant-xyz/cli to fetch and process data. You can control its behavior using environment variables that map to CLI flags:

Environment Variable Default CLI Flag Description
ELEPHANT_PREPARE_USE_BROWSER false --use-browser Force browser mode for fetching
ELEPHANT_PREPARE_NO_FAST false --no-fast Disable fast mode
ELEPHANT_PREPARE_NO_CONTINUE false --no-continue Disable continue mode
ELEPHANT_PREPARE_IGNORE_CAPTCHA false --ignore-captcha Ignore captcha challenges
ELEPHANT_PREPARE_CONTINUE_BUTTON "" N/A CSS selector for continue button
UPDATER_SCHEDULE_RATE "1 minute" N/A Updater frequency (e.g., "5 minutes", "cron(_/1 _ * _ ? _)")

County-Specific Configuration

When processing inputs from multiple counties in the same node, you can provide county-specific configurations that override the general settings. The Lambda automatically detects the county from the county_jurisdiction field in unnormalized_address.json and applies the appropriate configuration.

How it works:

  1. The Lambda reads the county name directly from the unnormalized_address.json file inside the input ZIP (without extracting to disk)
  2. For each configuration variable, it checks for a county-specific version first (e.g., ELEPHANT_PREPARE_USE_BROWSER_Alachua)
  3. If not found, it falls back to the general version (e.g., ELEPHANT_PREPARE_USE_BROWSER)
  4. If neither exists, it uses the default value

Priority order: County-specific → General → Default

Complete multi-county configuration example:

# General configuration (default for all counties)
export ELEPHANT_PREPARE_USE_BROWSER=false
export ELEPHANT_PREPARE_NO_FAST=false
export ELEPHANT_PREPARE_NO_CONTINUE=false
export ELEPHANT_PREPARE_IGNORE_CAPTCHA=false

# Alachua County - needs browser mode with specific selectors
export ELEPHANT_PREPARE_USE_BROWSER_Alachua=true
export ELEPHANT_PREPARE_CONTINUE_BUTTON_Alachua=".btn.btn-primary.button-1"
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Alachua="SEARCH_BY_PARCEL_ID"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Alachua='{"search_form_selector": "#ctlBodyPane_ctl03_ctl01_txtParcelID", "search_result_selector": "#ctlBodyPane_ctl10_ctl01_lstBuildings_ctl00_dynamicBuildingDataRightColumn_divSummary"}'

# Sarasota County - different template and slower processing
export ELEPHANT_PREPARE_USE_BROWSER_Sarasota=true
export ELEPHANT_PREPARE_NO_FAST_Sarasota=true
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Sarasota="CUSTOM_SEARCH"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Sarasota='{"timeout": 60000, "search_selector": "#parcel-search", "submit_button": "#search-submit"}'

# Charlotte County - simple browser mode, no template needed
export ELEPHANT_PREPARE_USE_BROWSER_Charlotte=true

# Santa Rosa County - note the underscore for the space
export ELEPHANT_PREPARE_USE_BROWSER_Santa_Rosa=true
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Santa_Rosa="SANTA_ROSA_FLOW"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Santa_Rosa='{"timeout": 45000}'

# Palm Beach County - another example with space in name
export ELEPHANT_PREPARE_USE_BROWSER_Palm_Beach=true
export ELEPHANT_PREPARE_NO_FAST_Palm_Beach=true

# Broward County - uses general settings (no browser mode)
# No county-specific settings needed

# Deploy - all county configurations are automatically applied
./scripts/deploy-infra.sh

When the Lambda processes data:

  • Alachua inputs → Uses browser mode with SEARCH_BY_PARCEL_ID template
  • Sarasota inputs → Uses browser mode with CUSTOM_SEARCH template and no-fast mode
  • Charlotte inputs → Uses simple browser mode
  • Santa Rosa inputs → Uses browser mode with SANTA_ROSA_FLOW template (county with space in name)
  • Palm Beach inputs → Uses browser mode with no-fast flag (county with space in name)
  • Broward inputs → Uses general settings (no browser mode)
  • Any other county → Uses general settings

Manual configuration update:

If you need to update county-specific configurations after deployment:

# Set county-specific environment variables
export ELEPHANT_PREPARE_USE_BROWSER_Charlotte=true
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Charlotte="CHARLOTTE_FLOW"

# Apply to existing Lambda
./scripts/set-county-configs.sh

County Repair Flag Reference

  • Parameter Store naming: /${STACK_NAME}/repair/<County_With_Underscores>
  • Example names:
    • /elephant-oracle-node/repair/Palm_Beach
    • /elephant-oracle-node/repair/Santa_Rosa
  • Values are simple strings: "true" or "false"
  • Missing parameter is treated as false.

Supported county-specific variables:

  • ELEPHANT_PREPARE_USE_BROWSER_<CountyName>
  • ELEPHANT_PREPARE_NO_FAST_<CountyName>
  • ELEPHANT_PREPARE_NO_CONTINUE_<CountyName>
  • ELEPHANT_PREPARE_IGNORE_CAPTCHA_<CountyName>
  • ELEPHANT_PREPARE_CONTINUE_BUTTON_<CountyName>
  • ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_<CountyName>
  • ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_<CountyName>

Important naming convention for counties with spaces:

For counties with spaces in their names, replace spaces with underscores in the environment variable name:

  • "Santa Rosa"ELEPHANT_PREPARE_USE_BROWSER_Santa_Rosa
  • "Palm Beach"ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Palm_Beach
  • "San Diego"ELEPHANT_PREPARE_NO_FAST_San_Diego

The Lambda automatically handles this conversion when matching county names from the data.

Browser Flow Template Configuration

For advanced browser automation scenarios, you can provide custom browser flow templates and parameters. This allows you to customize how the prepare function interacts with different county websites.

Environment Variable Default Description
ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE "" Browser flow template name to use
ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS "" JSON string containing parameters for the browser flow template

Important: These two environment variables must be provided together. If only one is set, the deployment will fail with a validation error.

General configuration example:

# Set browser flow template configuration and continue button selector for all counties
export ELEPHANT_PREPARE_CONTINUE_BUTTON=".btn.btn-primary.button-1"
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE="SEARCH_BY_PARCEL_ID"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS='{"search_form_selector": "#ctlBodyPane_ctl03_ctl01_txtParcelID", "search_result_selector": "#ctlBodyPane_ctl10_ctl01_lstBuildings_ctl00_dynamicBuildingDataRightColumn_divSummary"}'

# Deploy with the configuration
./scripts/deploy-infra.sh

County-specific browser flow example:

# Different counties may need different browser flow templates, continue button selectors, and parameters
export ELEPHANT_PREPARE_CONTINUE_BUTTON_Alachua=".btn.btn-primary.button-1"
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Alachua="SEARCH_BY_PARCEL_ID"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Alachua='{"search_form_selector": "#ctlBodyPane_ctl03_ctl01_txtParcelID", "search_result_selector": "#ctlBodyPane_ctl10_ctl01_lstBuildings_ctl00_dynamicBuildingDataRightColumn_divSummary"}'

export ELEPHANT_PREPARE_CONTINUE_BUTTON_Sarasota="#submit"
export ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Sarasota="CUSTOM_SEARCH"
export ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Sarasota='{"timeout": 60000, "selector": "#search-box"}'

# Deploy - each county will use its specific configuration
./scripts/deploy-infra.sh

Technical details:

  • The JSON parameters must be valid JSON
  • The deployment script validates the JSON structure before proceeding
  • Internally, the JSON is converted to a simple key:value format for safe transport through the deployment pipeline
  • The Lambda reconstructs it back to JSON before passing to the prepare function
  • Parameters support strings, numbers, and booleans

Deploy with custom prepare flags:

# Deploy with browser mode enabled
sam deploy --parameter-overrides \
  ElephantPrepareUseBrowser="true" \
  ElephantPrepareNoFast="false" \
  ElephantPrepareNoContinue="false"


# Or set as environment variables before deploy-infra.sh
export ELEPHANT_PREPARE_USE_BROWSER=true
export ELEPHANT_PREPARE_NO_FAST=true
export UPDATER_SCHEDULE_RATE="2 minutes"
./scripts/deploy-infra.sh

View prepare function logs:

The Lambda logs will show exactly which configuration is being used for each county:

📍 Extracting county information from input...
✅ Detected county: Santa Rosa
Building prepare options...
Event browser setting: undefined (using: true)
Checking environment variables for prepare flags:
🏛️ Looking for county-specific configurations for: Santa Rosa
  Using county-specific: ELEPHANT_PREPARE_USE_BROWSER_Santa_Rosa='true'
✓ Setting useBrowser: true (Force browser mode)
  Using general: ELEPHANT_PREPARE_NO_FAST='false'
✗ Not setting noFast flag (Disable fast mode)
  Using county-specific: ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE_Santa_Rosa='SANTA_ROSA_FLOW'
Browser flow template configuration detected:
✓ ELEPHANT_PREPARE_BROWSER_FLOW_TEMPLATE='SANTA_ROSA_FLOW'
  Using county-specific: ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS_Santa_Rosa='timeout:45000'
✓ ELEPHANT_PREPARE_BROWSER_FLOW_PARAMETERS parsed successfully:
{
  "timeout": 45000
}
Calling prepare() with these options...

Keystore Mode Details

The keystore mode provides a secure way to manage private keys for blockchain submissions using the industry-standard EIP-2335 encryption format.

How it works:

  1. The deployment script validates that the keystore file exists and the password is provided
  2. The keystore file is securely uploaded to S3 in the environment bucket under keystores/ prefix
  3. Lambda functions are configured with the S3 location and password as encrypted environment variables
  4. During execution, the submit Lambda downloads the keys tore from S3 and uses it for blockchain submissions

Creating a Keystore File: You can create a keystore file using the Elephant CLI tool. For detailed instructions, refer to the Elephant CLI Encrypted JSON Keystore documentation.

Security considerations:

  • The keystore uses EIP-2335 standard encryption (PBKDF2 with SHA-256 for key derivation, AES-128-CTR for encryption)
  • The keystore file is stored encrypted in S3 with versioning enabled for audit trails
  • The password is stored as an encrypted environment variable in Lambda
  • Lambda functions have minimal S3 permissions (read-only access to keystores only)
  • The keystore is only downloaded to Lambda's temporary storage during execution and is immediately cleaned up after use

Gas Price Configuration (Keystore Mode Only)

When using keystore mode (self-custodial oracles), you can dynamically configure gas prices via AWS Systems Manager Parameter Store. This allows you to adjust gas fees without redeploying your infrastructure, which is especially useful during periods of network congestion.

Important: This feature is only available for keystore mode. API mode (institutional oracles) gas prices are managed by Elephant.

Setting Up Gas Price Configuration

Use the set-gas-price.sh script to configure gas prices in SSM Parameter Store at /elephant-oracle-node/gas-price. The system supports three formats:

Option 1: EIP-1559 Format (Recommended)

Provides full control over base fee and priority fee:

./scripts/set-gas-price.sh --max-fee 50 --priority-fee 2
  • --max-fee: Maximum total fee per gas unit (in Gwei)
  • --priority-fee: Tip to miners/validators (in Gwei)

Option 2: Legacy Format

Simple gas price value:

./scripts/set-gas-price.sh --gas-price 50

This sets a gas price of 50 Gwei using the legacy transaction format.

Option 3: Automatic Gas Price

Let the RPC provider determine optimal gas fees:

./scripts/set-gas-price.sh --auto

Viewing Gas Price Configuration

# View current gas price configuration
./scripts/set-gas-price.sh --view

# View parameter history
./scripts/set-gas-price.sh --history

You can also view via AWS Console:

  1. Go to AWS Systems Manager → Parameter Store
  2. Look for /elephant-oracle-node/gas-price

Updating Gas Prices

One of the main benefits of using SSM Parameter Store is the ability to update gas prices without redeploying:

# Increase gas price during high network congestion
./scripts/set-gas-price.sh --max-fee 100 --priority-fee 5

# Lower it when network is quiet
./scripts/set-gas-price.sh --max-fee 30 --priority-fee 1.5

Note: Gas price changes take effect on the next Lambda execution. No redeployment is required.

Typical Gas Price Values

For reference, typical Ethereum mainnet values:

  • Normal conditions: maxFeePerGas: 20-50 Gwei, maxPriorityFeePerGas: 1-3 Gwei
  • High congestion: maxFeePerGas: 100-200 Gwei, maxPriorityFeePerGas: 5-10 Gwei
  • Low activity: maxFeePerGas: 10-20 Gwei, maxPriorityFeePerGas: 1 Gwei

Monitoring Gas Price Usage

Check your Lambda CloudWatch logs to verify gas price configuration:

# View submit Lambda logs
aws logs tail /aws/lambda/elephant-oracle-node-SubmitterFunction --follow

You should see log entries like:

  • Using EIP-1559 maxFeePerGas: 50 Gwei (for EIP-1559 format)
  • Using EIP-1559 maxPriorityFeePerGas: 2 Gwei
  • OR Using legacy gas price: 50 Gwei (for legacy format)

Custom SSM Parameter Name

By default, the system reads from /elephant-oracle-node/gas-price. To use a different parameter name, set the GAS_PRICE_PARAMETER_NAME environment variable in the Lambda function and update the IAM permissions in prepare/template.yaml accordingly.

Update transform scripts

Transforms are stored as raw files under transform/<county>/. Each county folder can contain any structure you need (for example transform/brevard/scripts/*.js).

Important: By default, transform scripts are NOT uploaded during deployment (UPLOAD_TRANSFORMS defaults to false). You must explicitly set export UPLOAD_TRANSFORMS=true to enable automatic uploads.

To ship new or updated transform code:

  1. Edit the files in the appropriate county directory under transform/.
  2. Set export UPLOAD_TRANSFORMS=true before running the deploy script.
  3. Run ./scripts/deploy-infra.sh.
    • The script rebuilds workflow/lambdas/post/transforms/<county>.zip for each county.
    • It synchronizes those archives (plus a manifest) to the environment bucket under transforms/ and updates the TransformS3Prefix parameter used by the post Lambda.
  4. On first invocation per county, the post Lambda downloads s3://<prefix>/<county>.zip, caches it in /tmp/county-transforms/<county>.zip, verifies the MD5/ETag, and reuses the cache until the remote ETag changes.
  5. Subsequent deployments automatically refresh S3 and the Lambda detects updated archives by comparing ETags.

Alternative: If you don't want to upload scripts during deployment (or scripts already exist in S3), you can skip setting UPLOAD_TRANSFORMS and manually set TRANSFORM_S3_PREFIX_VALUE to point to the existing S3 location.

Tip: during local testing, remove /tmp/county-transforms to force a fresh download.

GitHub Integration for Transform Scripts

The system includes automatic GitHub integration that syncs transform scripts to the upstream repository https://github.com/elephant-xyz/Counties-trasform-scripts and creates pull requests.

How it works:

  1. When transform scripts are uploaded to S3 (via ./scripts/deploy-infra.sh), they are automatically synced to GitHub
  2. The Lambda function forks the repository if needed (on first run)
  3. Scripts are committed to a branch named auto-update/<county> (e.g., auto-update/brevard)
  4. A pull request is created (or updated if one already exists) for each county

Setup:

  1. Create a GitHub Personal Access Token:

    • Go to GitHub Settings → Developer settings → Personal access tokens → Tokens (classic)
    • Create a new token with repo scope (full control of private repositories)
    • Copy the token
  2. Set environment variables:

# Required for GitHub integration
export GITHUB_SECRET_NAME=github-token  # Name for the secret in AWS Secrets Manager
export GITHUB_USERNAME=your-github-username  # Your GitHub username
export GITHUB_TOKEN=ghp_xxxxxxxxxxxx  # Your GitHub personal access token

# Deploy
./scripts/deploy-infra.sh

What happens:

  • The deploy script creates/updates the GitHub token in AWS Secrets Manager
  • The Lambda function is configured with S3 event notifications for transforms/*.zip files
  • When a transform zip is uploaded, the Lambda:
    • Downloads and extracts the zip file
    • Normalizes the county name to match the GitHub repository structure
    • Creates or updates the branch auto-update/<county>
    • Commits scripts to <county>/scripts/ directory
    • Creates or updates a pull request to the upstream repository

Branch and PR Strategy:

  • One PR per county: Each county gets its own branch and PR
  • Branch naming: auto-update/<county> (e.g., auto-update/brevard, auto-update/palm-beach)
  • PR title: Update <county> transform scripts
  • Automatic updates: If a PR already exists for a county, subsequent uploads update the same PR

County Name Normalization:

The system automatically queries the GitHub repository to match county names with actual directory structures. This handles variations like:

  • Spaces in county names (e.g., "Palm Beach", "Santa Rosa")
  • Case differences
  • Directory naming conventions

Security:

  • GitHub token is stored securely in AWS Secrets Manager
  • Lambda function has minimal permissions (S3 read-only for transforms, Secrets Manager read-only)
  • Token requires repo scope for full repository access

Disabling GitHub Integration:

If you don't want to use GitHub integration, simply don't set the GITHUB_SECRET_NAME, GITHUB_USERNAME, or GITHUB_TOKEN environment variables. The Lambda function will not be created and S3 uploads will proceed normally.

3) Start the workflow

Use your input S3 bucket name:

# Use default workflow queue
./scripts/start-step-function.sh <your-bucket-name>

# Use county-specific queue
./scripts/start-step-function.sh --queue elephant-workflow-queue-escambia <your-bucket-name>

Available bucket names as of now:

  • elephant-input-breavard-county
  • elephant-input-broward-county
  • elephant-input-charlotte-county
  • elephant-input-duval-county
  • elephant-input-hillsborough-county
  • elephant-input-lake-county
  • elephant-input-lee-county
  • elephant-input-leon-county
  • elephant-input-manatee-county
  • elephant-input-palm-beach-county
  • elephant-input-pinellas-county
  • elephant-input-polk-county
  • elephant-input-santa-county

County-Specific Queue Setup

For better isolation and control, you can create separate queues for different counties. This allows you to:

  • Set different concurrency limits per county
  • Monitor county-specific processing separately
  • Isolate failures to individual counties
  • Process multiple counties in parallel with independent rate controls

Creating a County-Specific Queue

Use the create-county-queue.sh script to provision a complete queue setup:

./scripts/create-county-queue.sh <county-name>

Example: Create queue for Escambia County

./scripts/create-county-queue.sh escambia

With custom concurrency (default: 100):

MAX_CONCURRENCY=50 ./scripts/create-county-queue.sh escambia

What the script does:

  1. Creates Dead Letter Queue: elephant-workflow-queue-escambia-dlq

    • MessageRetentionPeriod: 1209600 seconds (14 days)
    • VisibilityTimeout: 331 seconds
    • SSE encryption enabled
  2. Creates Main Queue: elephant-workflow-queue-escambia

    • MessageRetentionPeriod: 1209600 seconds (14 days)
    • VisibilityTimeout: 331 seconds
    • SSE encryption enabled
    • RedrivePolicy configured with maxReceiveCount: 3
  3. Configures Lambda Event Source Mapping

    • Links queue to WorkflowStarterFunction
    • BatchSize: 1 (one message per invocation)
    • MaximumBatchingWindowInSeconds: 0 (process immediately)
    • MaximumConcurrency: 100 (or custom value)

Script output example:

[INFO] Creating county-specific workflow queue for: escambia
[INFO] Stack name: elephant-oracle-node
[INFO] Region: us-east-1
[STEP] Creating Dead Letter Queue: elephant-workflow-queue-escambia-dlq
[INFO] DLQ created: https://sqs.us-east-1.amazonaws.com/123456789012/elephant-workflow-queue-escambia-dlq
[INFO] DLQ ARN: arn:aws:sqs:us-east-1:123456789012:elephant-workflow-queue-escambia-dlq
[STEP] Creating workflow queue: elephant-workflow-queue-escambia
[INFO] Queue created: https://sqs.us-east-1.amazonaws.com/123456789012/elephant-workflow-queue-escambia
[INFO] Queue ARN: arn:aws:sqs:us-east-1:123456789012:elephant-workflow-queue-escambia
[STEP] Getting WorkflowStarterFunction ARN from stack: elephant-oracle-node
[INFO] Lambda Function ARN: arn:aws:lambda:us-east-1:123456789012:function:elephant-oracle-node-WorkflowStarterFunction-ABC123
[STEP] Creating event source mapping between queue and Lambda function
[INFO] Event source mapping created: 12345678-1234-1234-1234-123456789012

[STEP] ✓ County queue setup completed successfully!

[INFO] Summary:
  County Name:              escambia
  Queue Name:               elephant-workflow-queue-escambia
  Queue URL:                https://sqs.us-east-1.amazonaws.com/123456789012/elephant-workflow-queue-escambia
  Queue ARN:                arn:aws:sqs:us-east-1:123456789012:elephant-workflow-queue-escambia
  DLQ Name:                 elephant-workflow-queue-escambia-dlq
  DLQ URL:                  https://sqs.us-east-1.amazonaws.com/123456789012/elephant-workflow-queue-escambia-dlq
  DLQ ARN:                  arn:aws:sqs:us-east-1:123456789012:elephant-workflow-queue-escambia-dlq
  Lambda Function ARN:      arn:aws:lambda:us-east-1:123456789012:function:elephant-oracle-node-WorkflowStarterFunction-ABC123
  Event Source Mapping:     12345678-1234-1234-1234-123456789012
  Max Concurrency:          100

[INFO] You can now send messages to: https://sqs.us-east-1.amazonaws.com/123456789012/elephant-workflow-queue-escambia

Using County-Specific Queues

Process data through county-specific queue:

# Start S3ToSqsStateMachine and populate the county-specific queue
./scripts/start-step-function.sh --queue elephant-workflow-queue-escambia elephant-input-escambia-county

Multiple counties example:

# Create queues for multiple counties
./scripts/create-county-queue.sh escambia
./scripts/create-county-queue.sh broward
MAX_CONCURRENCY=200 ./scripts/create-county-queue.sh miami-dade  # Higher volume county

# Process each county with their dedicated queue
./scripts/start-step-function.sh --queue elephant-workflow-queue-escambia elephant-input-escambia-county
./scripts/start-step-function.sh --queue elephant-workflow-queue-broward elephant-input-broward-county
./scripts/start-step-function.sh --queue elephant-workflow-queue-miami-dade elephant-input-miami-dade-county

How It Works

  1. S3ToSqsStateMachine lists S3 objects and sends messages to the specified queue
  2. WorkflowStarterFunction is triggered by the county-specific queue via event source mapping
  3. ElephantExpressWorkflow executes synchronously
  4. On failure, the message is automatically retried by SQS (maxReceiveCount: 3)
  5. After 3 failures, the message moves to the county-specific DLQ automatically

Error Handling

  • The state machine no longer manually requeues to DLQ
  • SQS event source mapping handles all retries and DLQ delivery
  • WorkflowStarterFunction throws errors on failure to trigger redelivery
  • Messages are only moved to DLQ after exhausting all retries
  • Each county's failures are isolated in its own DLQ

Monitoring County Queues

Check queue depth:

aws sqs get-queue-attributes \
  --queue-url $(aws sqs get-queue-url --queue-name elephant-workflow-queue-escambia --query 'QueueUrl' --output text) \
  --attribute-names ApproximateNumberOfMessages,ApproximateNumberOfMessagesNotVisible

Check DLQ depth:

aws sqs get-queue-attributes \
  --queue-url $(aws sqs get-queue-url --queue-name elephant-workflow-queue-escambia-dlq --query 'QueueUrl' --output text) \
  --attribute-names ApproximateNumberOfMessages

View event source mapping status:

aws lambda list-event-source-mappings \
  --function-name $(aws cloudformation describe-stack-resources \
    --stack-name elephant-oracle-node \
    --query "StackResources[?LogicalResourceId=='WorkflowStarterFunction'].PhysicalResourceId" \
    --output text) \
  --query "EventSourceMappings[?contains(EventSourceArn, 'elephant-workflow-queue-escambia')]"

Adjusting Concurrency

To change the concurrency for an existing queue, update the event source mapping:

# Get the event source mapping UUID
UUID=$(aws lambda list-event-source-mappings \
  --function-name $(aws cloudformation describe-stack-resources \
    --stack-name elephant-oracle-node \
    --query "StackResources[?LogicalResourceId=='WorkflowStarterFunction'].PhysicalResourceId" \
    --output text) \
  --query "EventSourceMappings[?contains(EventSourceArn, 'elephant-workflow-queue-escambia')].UUID" \
  --output text)

# Update the maximum concurrency
aws lambda update-event-source-mapping \
  --uuid $UUID \
  --scaling-config "MaximumConcurrency=50"

4) Pause your Airflow DAG

If you also run an Airflow pipeline for the same data, open the Airflow UI and toggle the DAG off (pause) to avoid duplicate processing.

Monitor the workflow

  • Step Functions: open AWS Console → Step Functions → State machines. The Express workflow name contains "ElephantExpressWorkflow". View current and recent executions.
  • Logs: CloudWatch Logs group /aws/vendedlogs/states/ElephantExpressWorkflow contains execution logs for the Express workflow.

Helpful docs:

Control concurrency

Throughput is governed by the SQS → Lambda trigger on WorkflowStarterFunction:

  • Batch size: number of SQS messages per Lambda invoke. Keep it small (often 1) to process one job per execution.
  • Reserved concurrency on the Lambda: caps how many executions run in parallel.

Use the AWS Console → Lambda → WorkflowStarterFunction → Configuration → Triggers (SQS) to adjust Batch size, and Configuration → Concurrency to set reserved concurrency.

Docs:

Inspect failures

  • Step Functions Console: select your Express state machine → Executions → filter by Failed → open an execution to see the error and the failed state.
  • CloudWatch Logs: from the execution view, follow the log link to see state logs. You can also open the Lambda’s log groups for detailed stack traces.

Docs:

Query Post-Processing Logs

The post-processing Lambda logs detailed execution metrics that can be aggregated and analyzed using the built-in query script. This provides county-level success rates, transaction counts, and failure breakdowns.

Query recent post-processing logs (last hour):

npm run query-post-logs

Query logs for a specific time range:

# Query logs from 2 hours ago to 1 hour ago
npm run query-post-logs -- --start 2025-09-24T14:00 --end 2025-09-24T15:00

# Query logs for the last 24 hours
npm run query-post-logs -- --start 2025-09-23T15:30 --end 2025-09-24T15:30

Query with custom AWS profile/region:

# Use specific AWS profile
AWS_PROFILE=your-profile npm run query-post-logs

# Use specific AWS region
npm run query-post-logs -- --region us-east-1

Sample output:

County: Lake
  Total executions: 60
  Success rate: 63.33% (38 successes / 60 runs)
  Total successful transactions: 76
  Failure counts by step:
    post_lambda_failed: 14
    validation_failed: 8

County: Brevard
  Total executions: 45
  Success rate: 71.11% (32 successes / 45 runs)
  Total successful transactions: 128
  Failure counts by step:
    transform_failed: 10
    hash_failed: 3

What the metrics mean:

  • Total executions: Number of post-processing Lambda invocations for this county
  • Success rate: Percentage of executions that completed successfully (generated transaction items)
  • Total successful transactions: Sum of all transaction items generated across successful runs
  • Failure counts by step: Breakdown of failures by the step where they occurred:
    • post_lambda_failed: General execution failures
    • validation_failed: Data validation errors
    • transform_failed: Data transformation errors
    • hash_failed: IPFS hashing errors
    • upload_failed: IPFS upload errors

Command options:

Usage: npm run query-post-logs [-- --stack <stack-name>] [-- --start <ISO-8601>] [-- --end <ISO-8601>] [-- --profile <aws_profile>] [-- --region <aws_region>]

Options:
  --stack, -s       CloudFormation stack name (default: elephant-oracle-node)
  --start           ISO-8601 start time (default: one hour ago, UTC)
  --end             ISO-8601 end time (default: now, UTC)
  --profile         AWS profile for credentials (optional)
  --region          AWS region override (optional)
  --help            Show this help text

This query tool uses CloudWatch Logs Insights to efficiently analyze large volumes of log data and provides actionable metrics for monitoring post-processing performance across different counties.

Proxy Rotation

The system supports automatic proxy rotation for the prepare function. This helps distribute load across multiple proxies and prevents rate limiting or IP blocking.

Setup Proxies

1. Create a proxy file:

Create a text file with one proxy per line in format: username:password@ip:port

# Create proxies.txt
cat > proxies.txt <<EOF
user1:[email protected]:8080
user2:[email protected]:8080
user3:[email protected]:8080
EOF

You can use proxies.example.txt as a template.

2. Deploy with proxies:

# Initial deployment with proxies
export PROXY_FILE=proxies.txt
./scripts/deploy-infra.sh

3. Update proxies later (without full deployment):

# Update or add proxies anytime
./scripts/update-proxies.sh proxies.txt

How It Works

  • Each Lambda invocation automatically selects the least recently used proxy
  • Proxies are rotated automatically based on usage timestamps
  • Failed proxies are tracked but remain available for retry
  • No configuration needed - just provide the proxy file

Verify Proxies

Check which proxies are configured:

./scripts/update-proxies.sh --list

Remove All Proxies

./scripts/update-proxies.sh --clear

Notes

  • Empty lines and lines starting with # are ignored (comments)
  • Proxy format must be: username:password@ip:port
  • Port must be numeric
  • If no proxies are configured, the system works normally without them

That's it — set env vars, deploy, start, monitor, and tune concurrency.

Dead Letter Queue (DLQ) Management

Reprocessing Failed Messages

When messages fail processing multiple times, they are moved to Dead Letter Queues. You can reprocess these messages back to the main queue after fixing underlying issues:

Reprocess Workflow DLQ messages:

./scripts/reprocess-dlq.sh --dlq-type workflow --batch-size 10 --max-messages 100

Reprocess Transactions DLQ messages:

./scripts/reprocess-dlq.sh --dlq-type transactions --max-messages 1000

Reprocess County-Specific DLQ messages:

County-specific queues have their own DLQs. To reprocess failed messages from a county-specific DLQ:

# Get the DLQ URL for the county
DLQ_URL=$(aws sqs get-queue-url --queue-name elephant-workflow-queue-escambia-dlq --query 'QueueUrl' --output text)
TARGET_URL=$(aws sqs get-queue-url --queue-name elephant-workflow-queue-escambia --query 'QueueUrl' --output text)

# Use AWS SQS redrive to move messages back
aws sqs start-message-move-task \
  --source-arn $(aws sqs get-queue-attributes --queue-url $DLQ_URL --attribute-names QueueArn --query 'Attributes.QueueArn' --output text) \
  --destination-arn $(aws sqs get-queue-attributes --queue-url $TARGET_URL --attribute-names QueueArn --query 'Attributes.QueueArn' --output text)

DLQ Types:

  • workflow - Workflow Dead Letter Queue → Workflow SQS Queue
  • transactions - Transactions Dead Letter Queue → Transactions SQS Queue
  • County-specific: elephant-workflow-queue-<county>-dlqelephant-workflow-queue-<county>

Parameters:

  • --dry-run - Show what would be done without moving messages
  • --batch-size - Messages per batch (default: 10)
  • --max-messages - Maximum messages to reprocess (default: 100)
  • --verbose - Detailed output

Error Handling Changes:

As of the latest update, the system uses SQS-native error handling:

  • Messages are automatically retried by SQS (maxReceiveCount: 3)
  • Failed messages move to DLQ automatically after exhausting retries
  • No manual requeuing logic in the state machine
  • WorkflowStarterFunction throws errors to trigger SQS redelivery

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • JavaScript 94.3%
  • Shell 5.7%