feat: add wms_oracle community connector#26
Conversation
Adds a generalized Oracle WMS REST API connector syncing 26 warehouse entities with two-phase incremental sync, descending historical backfill across rolling 30-day windows, pre-cursor hourly drift detection, and parallel entity processing via ThreadPoolExecutor. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Require https:// in validate_configuration (was accepting http://) - Remove dead code: apply_lookback(), INCREMENTAL_LOOKBACK_MINUTES, timedelta import - Remove lag_minutes throughout (config, code, docs) — default was 0, adds confusion - Fix page_size default inconsistency: configuration.json now shows 1000 to match utils.py and README - Delete requirements.txt — requests is pre-installed in the SDK runtime - Fix INCREMENTAL_CHECKPOINT_INTERVAL_SECONDS formatting (Black parens artifact) - README: expand Data handling with mod_ts detection mechanics and adding-entity instructions - README: add Pre-cursor drift check section with full mechanics explanation - README: expand monitoring tables with column-level documentation and usage notes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove unused datetime imports (timezone, timedelta) from connector.py introduced when lag_minutes was removed - Wrap all E501 log message f-strings across api.py, backfill.py, connector.py, incremental.py, and pre_sync_drift_check.py - Apply black reformatting to utils.py (INCREMENTAL_CHECKPOINT_INTERVAL_SECONDS) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Black always generates W503-style line breaks (operator at start of continuation line). Without this, any connector using multi-line boolean conditions will fail the linting check. W503 and W504 are mutually exclusive — suppressing W503 is the standard fix for Black + flake8 coexistence. Also removes noqa: W503 comments added as a temporary workaround. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The comments were adding enough length to trigger Black reformatting. W503 is now globally ignored via .flake8 so the comments are redundant. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a new wms_oracle/ community connector example that syncs Oracle WMS Cloud REST API entities into Fivetran, including incremental sync + historical backfill logic and pre-sync drift monitoring, and wires it into the repository documentation.
Changes:
- Added a full Oracle WMS connector implementation split across
connector.pyand helper modules (api.py,incremental.py,backfill.py,pre_sync_drift_check.py,utils.py). - Added connector documentation (
wms_oracle/README.md) and example config (wms_oracle/configuration.json). - Updated repository-level docs/linting (
README.md,.flake8).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| wms_oracle/connector.py | Main connector entrypoint, schema, and sync orchestration (incremental/backfill/drift-check). |
| wms_oracle/api.py | Oracle WMS REST API request + pagination utilities and capability probing. |
| wms_oracle/incremental.py | Two-phase incremental sync implementation (mod_ts + create_ts catch-up). |
| wms_oracle/backfill.py | Historical backfill implementation using rolling windows and DESC pagination. |
| wms_oracle/pre_sync_drift_check.py | Pre-sync hourly drift detection and monitoring table writes. |
| wms_oracle/utils.py | Constants, entity list, configuration validation, timestamp utilities. |
| wms_oracle/configuration.json | Example connector configuration template. |
| wms_oracle/README.md | Connector example documentation. |
| README.md | Adds wms_oracle to the catalog list. |
| .flake8 | Updates global flake8 ignore list. |
| { | ||
| "base_url": "https://<YOUR_REGION>.wms.ocs.oraclecloud.com/<YOUR_ORG>", | ||
| "username": "<YOUR_USERNAME>", | ||
| "password": "<YOUR_PASSWORD>", | ||
| "page_size": "1000", | ||
| "max_pages": "100", | ||
| "lookback_check_hours": "24", | ||
| "test_entities": "" | ||
| } |
| { | ||
| "base_url": "https://<YOUR_REGION>.wms.ocs.oraclecloud.com/<YOUR_ORG>", | ||
| "username": "<YOUR_USERNAME>", | ||
| "password": "<YOUR_PASSWORD>", | ||
| "page_size": "1000", | ||
| "max_pages": "100", | ||
| "lookback_check_hours": "24", | ||
| "test_entities": "" | ||
| } |
| def handle_records(records: list): | ||
| for record in records: | ||
| # Upsert each record into the destination table, inserting or updating as needed. | ||
| op.upsert(table=entity, data=record) | ||
|
|
| with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: | ||
| futures = {executor.submit(fetch_count, task): task for task in tasks} | ||
| for future in as_completed(futures): | ||
| entity, hour_str, gte, lt, is_partial, count = future.result() | ||
|
|
||
| # Full windows: compare and save to state | ||
| if not is_partial: | ||
| prev_count = prev_hourly_counts.get(entity, {}).get(hour_str) | ||
| if prev_count is not None and count > prev_count: | ||
| log.warning( | ||
| f"Pre-cursor drift: {entity} {hour_str} count increased " | ||
| f"{prev_count} → {count}" | ||
| ) | ||
| hours_to_repull.append((entity, hour_str, gte, lt, prev_count, count)) | ||
|
|
||
| if entity not in new_counts: | ||
| new_counts[entity] = {} | ||
| new_counts[entity][hour_str] = count | ||
|
|
||
| # Upsert hourly count into the monitoring table for observability. | ||
| op.upsert( | ||
| "pre_cursor_hourly_counts", | ||
| { | ||
| "table_name": entity, | ||
| "hour_start": hour_str, | ||
| "record_count": count, | ||
| "batch_id": batch_id, | ||
| "is_partial": is_partial, | ||
| }, | ||
| ) |
| if page_max_ts: | ||
| if incremental_max_mod_ts is None or page_max_ts > incremental_max_mod_ts: | ||
| incremental_max_mod_ts = page_max_ts | ||
| page_max_dt = datetime.fromisoformat(page_max_ts) |
| and page_max_ts is not None | ||
| and page_max_ts < bf_prev_page_min_ts | ||
| ): | ||
| cursor_dt = datetime.fromisoformat(bf_prev_page_min_ts) |
| (total_records, incremental_max_mod_ts) | ||
| incremental_max_mod_ts is None if no records were returned in Phase 1. | ||
| """ | ||
| cursor_dt = datetime.fromisoformat(cursor) |
| # ── Pass 3: Sequential incremental for entities with no backfill ────── | ||
| # Entities that finished their backfill before this sync and weren't | ||
| # covered by pass 1 (i.e. incremental_only entities not yet processed). | ||
| for entity in incremental_only: | ||
| if entity not in completed_entities: | ||
| try: | ||
| record_result(submit_entity(entity)) | ||
| except Exception as e: | ||
| log.error(f"Error processing {entity}: {e}") | ||
| all_success = False | ||
| completed_entities.add(entity) |
| except Exception as e: | ||
| log.warning(f"Could not check mod_ts for {entity}: {e}. Assuming no mod_ts support.") | ||
| return False |
|
|
||
| ## Tables created | ||
|
|
||
| 26 warehouse entity tables (all with primary key `id`): |
fivetran-JenasVimal
left a comment
There was a problem hiding this comment.
Please Address all of the copilot comments
| doctests = true | ||
| select = C,E,F,W,B,B950 | ||
| extend-ignore = E203, E501, B008 | ||
| extend-ignore = E203, E501, B008, W503 |
There was a problem hiding this comment.
Why are we removing this ?
This need permission from the Team before removal @fivetran-sahilkhirwal
W503 is a flake8 warning about line breaks around binary operators, for example:
result = (
a
+ b
)
| ```json | ||
| { | ||
| "base_url": "https://<YOUR_REGION>.wms.ocs.oraclecloud.com/<YOUR_ORG>", | ||
| "username": "<YOUR_USERNAME>", | ||
| "password": "<YOUR_PASSWORD>", | ||
| "page_size": "1000", | ||
| "max_pages": "100", | ||
| "lookback_check_hours": "24", | ||
| "test_entities": "" | ||
| } |
There was a problem hiding this comment.
please add placeholders for this , refer to copilot suggestion
| import time | ||
| import requests | ||
| from typing import Optional, Tuple | ||
|
|
||
| from fivetran_connector_sdk import Logging as log | ||
|
|
||
| from utils import ( | ||
| API_VERSION, | ||
| DEFAULT_PAGE_SIZE, | ||
| MIN_PAGE_SIZE, | ||
| CHECKPOINT_INTERVAL_PAGES, | ||
| MAX_RETRIES, | ||
| INITIAL_BACKOFF_SECONDS, | ||
| OrderingNotSupportedError, | ||
| normalize_timestamp_to_oracle_format, | ||
| ) |
There was a problem hiding this comment.
please add comments for all of them , above the imports
Refer to the template_connector
|
|
||
|
|
||
| def check_entity_has_mod_ts(base_url: str, username: str, password: str, entity: str) -> bool: | ||
| """Return True if the entity's describe endpoint lists a mod_ts field.""" |
There was a problem hiding this comment.
all doc strings must have ,
Description
Args and Returns / Raises
| endpoint = f"{base_url}/wms/lgfapi/{API_VERSION}/entity/{entity}/describe" | ||
| try: | ||
| response = requests.get( | ||
| endpoint, params={"format": "json"}, auth=(username, password), timeout=60 | ||
| ) | ||
| response.raise_for_status() | ||
| return "mod_ts" in response.json().get("fields", {}) | ||
| except Exception as e: | ||
| log.warning(f"Could not check mod_ts for {entity}: {e}. Assuming no mod_ts support.") | ||
| return False |
There was a problem hiding this comment.
Please add retry for transient methods like
408 Request Timeout
429 Too Many Requests
500 Internal Server Error
502 Bad Gateway
503 Service Unavailable
504 Gateway Timeout
ConnectionError
ReadTimeout
| Fetch page 1 at page_size=1 to read result_count without loading records. | ||
| Used to sort entities largest-first before submitting to the thread pool. | ||
| Returns 0 on any error so the entity sorts to the back. | ||
| """ |
There was a problem hiding this comment.
please follow docstring format
| import json | ||
| import requests | ||
| from datetime import datetime | ||
| from typing import Optional | ||
| import time | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from threading import Lock | ||
|
|
||
| from utils import ( | ||
| ORACLE_WMS_ENTITIES, | ||
| DEFAULT_PAGE_SIZE, | ||
| DEFAULT_MAX_PAGES, | ||
| MAX_CONCURRENT_ENTITIES, | ||
| OrderingNotSupportedError, | ||
| validate_configuration, | ||
| get_current_timestamp, | ||
| to_utc, | ||
| ) | ||
| from api import check_entity_has_mod_ts, probe_entity_count, fetch_entity_data | ||
| from incremental import run_incremental_phase | ||
| from backfill import run_backfill_phase | ||
| from pre_sync_drift_check import run_pre_cursor_hourly_check, run_daily_counts |
There was a problem hiding this comment.
please add small comments for these as well , as it will be helpful for the customers to understand
| """ | ||
| Entry point called by Fivetran on each sync. | ||
|
|
||
| State structure: | ||
| entity_cursors: {entity: timestamp} — incremental cursor per entity | ||
| entity_backfill_cursors: {entity: timestamp} — backfill window anchor per entity; | ||
| absent once backfill is complete | ||
| entity_mod_ts_support: {entity: bool} — cached describe-endpoint results | ||
| sync_in_progress: bool — True in mid-sync checkpoints, False in final checkpoint | ||
| """ |
There was a problem hiding this comment.
please use template_connector doc string for methods like schema , update etc
"""
Define the update function, which is a required function, and is called by Fivetran during each sync.
See the technical reference documentation for more details on the update function
https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update
Args:
configuration: A dictionary containing connection details
state: A dictionary containing state information from previous runs
The state dictionary is empty for the first sync or for any full re-sync
"""
| sync_in_progress: bool — True in mid-sync checkpoints, False in final checkpoint | ||
| """ | ||
| sync_wall_start = time.time() | ||
| log.info("Oracle WMS Connector: Starting sync") |
There was a problem hiding this comment.
@fivetran-clgritton please address this co-pilot comment as well
Summary
wms_oracle/— a generalized Oracle WMS Cloud REST API connector syncing 26 warehouse entities (orders, inventory, containers, purchasing documents, and more)wms_oracle/README.mdwith full documentation following the repo templateREADME.mdto listwms_oracleunder SaaS & APIsConnector highlights
mod_tscursor-advancement (Phase 1) +create_tscatch-up for backdated records (Phase 2)ThreadPoolExecutorfor backfill andmod_tscapability discoverypage_sizeon timeout and recalculates offset to resume without data losscounts_by_day,pre_cursor_hourly_counts) written each sync for observabilityTest plan
fivetran debug --configuration=configuration.jsonwith a real Oracle WMS instance and confirmwarehouse.dbis populatedgrep -ri "simfoods\|simmons")flake8passes (W503 only, pre-existing repo-wide)black --checkpasses🤖 Generated with Claude Code