diff --git a/.flake8 b/.flake8 index e008df1..ff9b060 100644 --- a/.flake8 +++ b/.flake8 @@ -4,7 +4,7 @@ show-source = true statistics = false doctests = true select = C,E,F,W,B,B950 -extend-ignore = E203, E501, B008 +extend-ignore = E203, E501, B008, W503 exclude = .git,__pycache__,.mypy_cache,.pytest_cache per-file-ignores = tests/*:D102,D104,F401 diff --git a/README.md b/README.md index 1e246b3..728e848 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ For SDK installation and setup, visit the main [Fivetran Connector SDK repositor - **[veeva_vault/basic_auth](https://github.com/fivetran/community_connectors/tree/main/veeva_vault/basic_auth)** - Authenticate to Veeva Vault with basic auth - **[veeva_vault/session_id_auth](https://github.com/fivetran/community_connectors/tree/main/veeva_vault/session_id_auth)** - Authenticate to Veeva Vault with session ID - **[vercel](https://github.com/fivetran/community_connectors/tree/main/vercel)** - Sync deployment data from Vercel REST API +- **[wms_oracle](https://github.com/fivetran/community_connectors/tree/main/wms_oracle)** - Sync warehouse management data from Oracle WMS REST API with incremental sync, historical backfill, and pre-cursor drift detection across 26 entities. - **[weights_and_biases](https://github.com/fivetran/community_connectors/tree/main/weights_and_biases)** - This example shows how to sync machine learning experiment tracking data from Weights & Biases (W&B), including projects, runs (experiments), and artifacts (models and datasets), using the Fivetran Connector SDK The connector uses the W&B Python SDK to retrieve data and implements robust error handling. You need to provide your W&B API key and entity name for this example to work. - **[zigpoll](https://github.com/fivetran/community_connectors/tree/main/zigpoll)** - Sync polling data from Zigpoll diff --git a/wms_oracle/README.md b/wms_oracle/README.md new file mode 100644 index 0000000..3c84251 --- /dev/null +++ b/wms_oracle/README.md @@ -0,0 +1,196 @@ +# Oracle WMS Connector Example + +## Connector overview + +This connector syncs warehouse management data from the Oracle WMS Cloud REST API to a Fivetran destination. It supports 26 warehouse entities including orders, inventory, containers, and purchasing documents, plus two monitoring tables that record daily volume probes and hourly drift counts. + +The connector uses a two-phase incremental strategy per entity: Phase 1 advances a `mod_ts` cursor forward in time, and Phase 2 catches up records that were created after the cursor with a backdated `mod_ts`. Historical backfill runs in descending order across rolling 30-day windows so recent data reaches the destination first. A pre-cursor hourly drift check runs before each sync to detect and re-pull any records modified in already-advanced windows. Entities with active backfills run in parallel via `ThreadPoolExecutor`; incremental-only entities run sequentially to avoid checkpoint contention. + + +## Requirements + +- [Supported Python versions](https://github.com/fivetran/community_connectors/blob/main/README.md#requirements) +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) +- An Oracle WMS Cloud instance with REST API access and a service account with read permissions on the entities you want to sync + + +## Getting started + +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +To initialize a new Connector SDK project using this connector as a starting point, run: + +``` +fivetran init --template wms_oracle +``` + +`fivetran init` initializes a new Connector SDK project by setting up the project structure, configuration files, and a connector you can run immediately with `fivetran debug`. For more information on `fivetran init`, refer to the [Connector SDK `init` documentation](https://fivetran.com/docs/connector-sdk/connector-development-and-configuration/connector-sdk-commands#fivetraninit). + +> Note: Ensure you have updated the `configuration.json` file with the necessary parameters before running `fivetran debug`. See the [Configuration file](#configuration-file) section for details on the required configuration parameters. + + +## Features + +- Two-phase incremental sync per entity: `mod_ts` cursor-advancement (Phase 1) plus `create_ts` catch-up for backdated records (Phase 2) +- Descending historical backfill across rolling 30-day windows, newest data first +- Pre-cursor hourly drift check: probes the 24 clock-aligned hours before each entity's cursor each sync and re-pulls any hour whose count increased +- Parallel entity processing via `ThreadPoolExecutor` for backfill and mod_ts capability discovery +- Adaptive page sizing: automatically halves `page_size` on timeout and recalculates the offset to resume without data loss +- Automatic full-scan fallback for entities that do not support DESC ordering +- `mod_ts` support discovered once per entity and cached in state, avoiding repeated describe-endpoint calls +- Entities sorted largest-first before processing using a lightweight count probe +- Two monitoring tables (`counts_by_day`, `pre_cursor_hourly_counts`) written each sync for observability + + +## Configuration file + +```json +{ + "base_url": "https://.wms.ocs.oraclecloud.com/", + "username": "", + "password": "", + "page_size": "1000", + "max_pages": "100", + "lookback_check_hours": "24", + "test_entities": "" +} +``` + +| Key | Required | Description | +|-----|----------|-------------| +| `base_url` | Yes | Base URL of your Oracle WMS instance, e.g. `https://region.wms.ocs.oraclecloud.com/org` | +| `username` | Yes | Oracle WMS service account username | +| `password` | Yes | Oracle WMS service account password | +| `page_size` | No | Records per page (default `1000`). Reduce if timeouts occur; the connector also adapts automatically | +| `max_pages` | No | Soft page limit per entity per sync for backfill (default `100`). The connector continues past this limit until the current timestamp group is fully consumed | +| `lookback_check_hours` | No | Number of hours before each entity's cursor to probe for drift (default `24`) | +| `test_entities` | No | Comma-separated list of entity names to sync; leave empty to sync all entities | + +> Note: When submitting connector code as a [Community Connector](https://github.com/fivetran/community_connectors/tree/main) in the open-source [Connector SDK repository](https://github.com/fivetran/community_connectors/tree/main), ensure the `configuration.json` file has placeholder values. When adding the connector to your production repository, ensure that the `configuration.json` file is not checked into version control to protect sensitive information. + + +## Authentication + +The connector uses HTTP Basic Authentication. Provide your Oracle WMS service account `username` and `password` in `configuration.json`. All requests are made over HTTPS. + + +## Pagination + +The Oracle WMS REST API uses offset-based pagination. The connector requests pages sequentially using `page` and `page_size` parameters. `page_count` is read from the first response and used to bound the loop; if Oracle reduces `page_count` mid-stream (stale cache behaviour), the lower value is accepted immediately to avoid phantom last-page 500 errors. + +The `max_pages` configuration key sets a soft limit for backfill fetches per entity per sync. When this limit is reached mid-timestamp-group, the connector continues fetching until the timestamp changes before checkpointing, ensuring no records at a boundary timestamp are skipped. + +On timeout, the connector halves `page_size` (down to a minimum of 25) and recalculates the current page number to preserve the same record offset. This reduction persists for the remainder of that entity's fetch. + + +## Data handling + +The connector determines whether each entity supports incremental sync by calling the Oracle WMS describe endpoint on the entity's first sync. If the response includes a `mod_ts` field, the entity uses cursor-based incremental sync; otherwise it receives a full scan each sync, preceded by `op.truncate()` to soft-delete records removed from the source. The describe result is cached in connector state so the call is only made once per entity. To force re-detection — for example, after an Oracle WMS upgrade that adds `mod_ts` support to an entity — remove that entity's entry from the `mod_ts_support` key in connector state. + +Note: a crash between `op.truncate()` and the first `op.upsert()` for a full-scan entity leaves the destination table empty until the next sync completes a full re-fetch. + +To add an entity, append its Oracle WMS API name to `ORACLE_WMS_ENTITIES` in `utils.py` and add a corresponding entry to the `schema()` function in `connector.py`: + +```python +{"table": "new_entity", "primary_key": ["id"]} +``` + +On the first sync after adding an entity the connector automatically detects the appropriate sync strategy. The `inventory_history` entity is included in `utils.py` as a commented-out example — uncomment it if your Oracle WMS instance supports this entity. + +Each record is delivered via `op.upsert()` using `id` as the primary key. The two monitoring tables use composite primary keys: + +- `counts_by_day`: `(table_name, mod_ts_day, batch_id)` +- `pre_cursor_hourly_counts`: `(table_name, hour_start, batch_id)` + +Timestamps are normalized to second precision before being used as Oracle WMS query parameters, as the API rejects sub-second values. + + +## Pre-cursor drift check + +Before each sync, the connector probes the Oracle WMS record count for each clock-aligned hourly window in the `lookback_check_hours` period immediately before each entity's incremental cursor. These counts are compared against the counts recorded during the prior sync. If a window's count has increased — indicating a long-running transaction that committed with a `mod_ts` inside an already-advanced window — the connector re-pulls all records for that window and upserts them before the main sync begins. + +A partial window (the sub-hour gap between the last full clock-aligned hour and the cursor's exact position) is probed and written to the `pre_cursor_hourly_counts` monitoring table for visibility, but is never compared against prior counts. This window grows legitimately each sync as the cursor advances within the current hour; comparing it would produce false positives. + +After each re-pull, the connector probes the count again to verify it matches the value that triggered the re-pull. A mismatch is logged as a warning, indicating the data may still be in flux. + +The `lookback_check_hours` configuration key controls how many hours are probed (default `24`). Transactions delayed by less than approximately one hour fall within the current-hour partial window and are not compared against prior counts. + + +## Error handling + +- Transient request failures are retried up to 5 times with exponential backoff starting at 1 second — refer to `make_api_request()` in `api.py`. +- Entities that return HTTP 400 for a given ordering parameter raise `OrderingNotSupportedError`, which bypasses retry and falls back to an unordered full scan. +- Timeouts trigger adaptive page-size reduction rather than a hard failure, allowing large entities to complete at a smaller page size. +- Per-entity failures are caught and logged without aborting the sync; partial progress is checkpointed so the next sync retries only the failed entity. + + +## Tables created + +26 warehouse entity tables (all with primary key `id`): + +| Table | Description | +|-------|-------------| +| `allocation` | Inventory allocations to orders | +| `batch_number` | Lot/batch tracking numbers | +| `company` | Company master records | +| `container` | Physical storage containers | +| `container_lock_xref` | Container lock cross-references | +| `facility` | Warehouse facility records | +| `history_activity` | Warehouse activity history | +| `ib_container` | Inbound containers | +| `ib_shipment` | Inbound shipment headers | +| `ib_shipment_dtl` | Inbound shipment detail lines | +| `inventory` | Current inventory positions | +| `inventory_attribute` | Inventory attribute values | +| `inventory_lock` | Inventory lock records | +| `inventory_status` | Inventory status codes | +| `item` | Item master records | +| `item_metric` | Item measurement metrics | +| `location` | Warehouse location master | +| `order_dtl` | Outbound order detail lines | +| `order_hdr` | Outbound order headers | +| `order_status` | Order status codes | +| `order_type` | Order type codes | +| `purchase_order_dtl` | Purchase order detail lines | +| `purchase_order_hdr` | Purchase order headers | +| `purchase_order_status` | Purchase order status codes | +| `putaway_type` | Putaway type codes | +| `vendor` | Vendor master records | + +2 monitoring tables written each sync for observability: + +`counts_by_day` records the number of records with a `mod_ts` on each calendar day for the last 30 days per entity. Use it to track daily modification volume, detect unexpected drops or spikes, and verify that recent days are receiving writes. + +| Column | Primary key | Description | +|--------|-------------|-------------| +| `table_name` | Yes | Entity name | +| `mod_ts_day` | Yes | Calendar day (`YYYY-MM-DD`) | +| `batch_id` | Yes | Sync start timestamp; identifies which sync wrote the row | +| `record_count` | No | Number of records with a `mod_ts` on this day | + +`pre_cursor_hourly_counts` records `mod_ts` counts for each clock-aligned hourly window in the drift-check lookback period before each entity's cursor. Use it to audit drift-check activity: compare `record_count` across `batch_id` values for the same `(table_name, hour_start)` to see which hours increased between syncs and triggered a re-pull. + +| Column | Primary key | Description | +|--------|-------------|-------------| +| `table_name` | Yes | Entity name | +| `hour_start` | Yes | UTC hour window start (ISO format) | +| `batch_id` | Yes | Sync start timestamp; identifies which sync wrote the row | +| `record_count` | No | Number of records with a `mod_ts` in this window | +| `is_partial` | No | `true` for the sub-hour gap between the last full clock-aligned hour and the exact cursor position — written for visibility only, not used for drift comparison | + + +## Additional files + +- `api.py` – Oracle WMS REST API client: single-page requests with retry and exponential backoff, multi-page pagination with adaptive page sizing, entity count probes, and mod_ts capability discovery. +- `utils.py` – Constants, the entity list, `OrderingNotSupportedError`, configuration validation, and timestamp utility functions. +- `incremental.py` – Two-phase incremental sync logic: Phase 1 `mod_ts` cursor-advancement and Phase 2 `create_ts` catch-up for backdated records. +- `backfill.py` – Historical backfill logic: descending offset pagination in rolling 30-day windows with timeout rollback and consecutive-empty-window termination. +- `pre_sync_drift_check.py` – Pre-cursor hourly drift detection: probes counts for clock-aligned hourly windows before each entity's cursor, compares against prior-sync counts, and re-pulls any hour whose count increased. + + +## Additional considerations + +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. diff --git a/wms_oracle/api.py b/wms_oracle/api.py new file mode 100644 index 0000000..cd5bfe7 --- /dev/null +++ b/wms_oracle/api.py @@ -0,0 +1,374 @@ +"""Oracle WMS REST API client: single-page requests, multi-page fetching, and entity probing.""" + +import time +import requests +from typing import Optional, Tuple + +from fivetran_connector_sdk import Logging as log + +from utils import ( + API_VERSION, + DEFAULT_PAGE_SIZE, + MIN_PAGE_SIZE, + CHECKPOINT_INTERVAL_PAGES, + MAX_RETRIES, + INITIAL_BACKOFF_SECONDS, + OrderingNotSupportedError, + normalize_timestamp_to_oracle_format, +) + +# ── Entity capability probe ─────────────────────────────────────────────────── + + +def check_entity_has_mod_ts(base_url: str, username: str, password: str, entity: str) -> bool: + """Return True if the entity's describe endpoint lists a mod_ts field.""" + endpoint = f"{base_url}/wms/lgfapi/{API_VERSION}/entity/{entity}/describe" + try: + response = requests.get( + endpoint, params={"format": "json"}, auth=(username, password), timeout=60 + ) + response.raise_for_status() + return "mod_ts" in response.json().get("fields", {}) + except Exception as e: + log.warning(f"Could not check mod_ts for {entity}: {e}. Assuming no mod_ts support.") + return False + + +# ── Single-page request ─────────────────────────────────────────────────────── + + +def make_api_request( + base_url: str, + username: str, + password: str, + entity: str, + page: int = 1, + mod_ts_filter: Optional[str] = None, + mod_ts_lt_filter: Optional[str] = None, + page_size: int = DEFAULT_PAGE_SIZE, + ordering: Optional[str] = None, + session: Optional[requests.Session] = None, + create_ts_gte_filter: Optional[str] = None, + create_ts_lt_filter: Optional[str] = None, + fields: Optional[str] = None, +) -> dict: + """ + Make a single paged request to the Oracle WMS entity endpoint with retry logic. + + Args: + mod_ts_filter: mod_ts__gte — lower bound for incremental (ASC) queries + mod_ts_lt_filter: mod_ts__lt — upper bound for backfill (DESC) queries + ordering: e.g. "mod_ts,id" (ASC) or "-mod_ts,id" (DESC) + create_ts_gte_filter: create_ts__gte — used in Phase 1b to catch backdated records + create_ts_lt_filter: create_ts__lt — upper bound for Phase 1b (sync_start_time) + session: Optional Session for connection reuse across pages + + Raises: + OrderingNotSupportedError: if the entity returns 400 for the given ordering + (never retried). + requests.exceptions.Timeout: propagated immediately for adaptive page-size + handling upstream. + requests.exceptions.RequestException: after MAX_RETRIES exhausted. + """ + params = { + "format": "json", + "page": str(page), + "page_size": str(page_size), + } + if mod_ts_filter: + params["mod_ts__gte"] = mod_ts_filter + if mod_ts_lt_filter: + params["mod_ts__lt"] = mod_ts_lt_filter + if create_ts_gte_filter: + params["create_ts__gte"] = create_ts_gte_filter + if create_ts_lt_filter: + params["create_ts__lt"] = create_ts_lt_filter + if ordering: + params["ordering"] = ordering + if fields: + params["fields"] = fields + + endpoint = f"{base_url}/wms/lgfapi/{API_VERSION}/entity/{entity}" + requester = session or requests + last_exception = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + response = requester.get( + endpoint, params=params, auth=(username, password), timeout=60 + ) + + if response.status_code == 400 and ordering: + raise OrderingNotSupportedError( + f"{entity} does not support ordering='{ordering}' (400 Bad Request)" + ) + if response.status_code == 404: + return { + "result_count": 0, + "page_count": 1, + "page_nbr": page, + "next_page": None, + "previous_page": None, + "results": [], + } + + response.raise_for_status() + return response.json() + + except (OrderingNotSupportedError, requests.exceptions.Timeout): + raise # Caller handles these — no retry + except requests.exceptions.RequestException as e: + last_exception = e + if attempt < MAX_RETRIES: + backoff = INITIAL_BACKOFF_SECONDS * (2 ** (attempt - 1)) + log.warning( + f"API request failed for {entity} page {page} " + f"(attempt {attempt}/{MAX_RETRIES}). Retrying in {backoff}s…" + ) + time.sleep(backoff) + else: + log.error( + f"API request failed for {entity} page {page} " + f"after {MAX_RETRIES} attempts: {e}" + ) + + raise last_exception + + +# ── Result count probe ──────────────────────────────────────────────────────── + + +def probe_entity_count( + base_url: str, + username: str, + password: str, + entity: str, + mod_ts_filter: Optional[str] = None, + mod_ts_lt_filter: Optional[str] = None, + ordering: Optional[str] = None, +) -> int: + """ + Fetch page 1 at page_size=1 to read result_count without loading records. + Used to sort entities largest-first before submitting to the thread pool. + Returns 0 on any error so the entity sorts to the back. + """ + try: + response = make_api_request( + base_url, + username, + password, + entity, + page=1, + page_size=1, + ordering=ordering, + mod_ts_filter=( + normalize_timestamp_to_oracle_format(mod_ts_filter) if mod_ts_filter else None + ), + mod_ts_lt_filter=( + normalize_timestamp_to_oracle_format(mod_ts_lt_filter) + if mod_ts_lt_filter + else None + ), + ) + return response.get("result_count", 0) or 0 + except Exception: + return 0 + + +# ── Multi-page fetch ────────────────────────────────────────────────────────── + + +def fetch_entity_data( + base_url: str, + username: str, + password: str, + entity: str, + mod_ts_filter: Optional[str] = None, + mod_ts_lt_filter: Optional[str] = None, + ordering: Optional[str] = None, + page_size: int = DEFAULT_PAGE_SIZE, + max_pages: Optional[int] = None, + checkpoint_callback=None, + records_callback=None, + session: Optional[requests.Session] = None, + create_ts_gte_filter: Optional[str] = None, + create_ts_lt_filter: Optional[str] = None, + phase_label: Optional[str] = None, +) -> Tuple[int, Optional[str], bool]: + """ + Paginate through all pages for an entity, calling records_callback on each page. + + Pagination ends on a truly empty page (len == 0). A partial page is NOT treated as + the end: Oracle pagination bugs can produce partial intermediate pages, and live + modifications can shift records in or out mid-stream. Only an empty page + unambiguously means no more records exist. + + Returns: + (total_records, extreme_mod_ts, finished_all_pages) + extreme_mod_ts: max mod_ts for ASC ordering, min for DESC — used as the next cursor. + finished_all_pages: True if all available pages were consumed. + """ + normalized_mod_ts_gte = ( + normalize_timestamp_to_oracle_format(mod_ts_filter) if mod_ts_filter else None + ) + normalized_mod_ts_lt = ( + normalize_timestamp_to_oracle_format(mod_ts_lt_filter) if mod_ts_lt_filter else None + ) + normalized_create_ts_gte = ( + normalize_timestamp_to_oracle_format(create_ts_gte_filter) + if create_ts_gte_filter + else None + ) + normalized_create_ts_lt = ( + normalize_timestamp_to_oracle_format(create_ts_lt_filter) if create_ts_lt_filter else None + ) + + is_desc = bool(ordering and ordering.startswith("-")) + total_records = 0 + pages_fetched = 0 + pages_since_checkpoint = 0 + extreme_mod_ts = None + page = 1 + total_pages = None # unknown until Oracle tells us; only ever increases once set + is_exhausted = False # True when an empty page confirms no more records + fetch_start = time.time() + total_api_ms = 0 + max_page_ms = 0 + # DESC only: extreme_mod_ts when max_pages was first hit. Keeps the loop running + # until the timestamp changes (handles same-ts bulk imports safely). + ts_when_max_reached = None + + while (total_pages is None or page <= total_pages) and ( + max_pages is None + or pages_fetched < max_pages + or (is_desc and ts_when_max_reached is not None and extreme_mod_ts == ts_when_max_reached) + ): + + # Adaptive page size: on timeout, halve page_size and recalculate the page number + # to preserve the same record offset. Reduction persists for all subsequent pages. + while True: + page_start = time.time() + try: + response_data = make_api_request( + base_url, + username, + password, + entity, + page, + normalized_mod_ts_gte, + normalized_mod_ts_lt, + page_size, + ordering, + session, + normalized_create_ts_gte, + normalized_create_ts_lt, + ) + break + except requests.exceptions.Timeout: + if page_size <= MIN_PAGE_SIZE: + log.error( + f"{entity}: page {page} timed out at minimum " + f"page_size={page_size}, giving up" + ) + raise + old_offset = (page - 1) * page_size + page_size = max(page_size // 2, MIN_PAGE_SIZE) + page = (old_offset // page_size) + 1 + total_pages = None # reset; Oracle will report new page_count at the new page_size + log.warning( + f"{entity}: page timed out, retrying at page {page} with page_size={page_size}" + ) + + page_ms = round((time.time() - page_start) * 1000) + total_api_ms += page_ms + max_page_ms = max(max_page_ms, page_ms) + + records = response_data.get("results", []) + + # Oracle's page_count can decrease mid-pagination (stale cache). Accept lower values + # immediately — a lower bound stops the loop before a phantom last page produces a 500. + new_page_count = response_data.get("page_count") + if new_page_count is not None: + if total_pages is not None and new_page_count < total_pages: + log.warning( + f"{entity}: page_count dropped {total_pages} → {new_page_count} on page {page}" + ) + total_pages = new_page_count + else: + total_pages = ( + new_page_count if total_pages is None else max(total_pages, new_page_count) + ) + + if len(records) == 0: + is_exhausted = True + break + + current_page = response_data.get("page_nbr", page) + if current_page % 50 == 0: + pct_str = f"{round(current_page / total_pages * 100, 1)}%" if total_pages else "?" + log.info( + f"{entity}: page {current_page}/{total_pages or '?'} ({pct_str}) " + f"— {len(records)} records this page, {total_records + len(records)} synced so far" + ) + + for record in records: + ts = record.get("mod_ts") + if ts: + if extreme_mod_ts is None: + extreme_mod_ts = ts + elif is_desc: + extreme_mod_ts = min(extreme_mod_ts, ts) + else: + extreme_mod_ts = max(extreme_mod_ts, ts) + + if records_callback: + records_callback(records) + + total_records += len(records) + page += 1 + pages_fetched += 1 + pages_since_checkpoint += 1 + + if ( + is_desc + and max_pages is not None + and pages_fetched == max_pages + and ts_when_max_reached is None + ): + ts_when_max_reached = extreme_mod_ts + if ts_when_max_reached: + log.warning( + f"{entity}: reached max_pages={max_pages} but cursor is still " + f"{ts_when_max_reached} — possible bulk import; " + f"continuing until timestamp changes" + ) + + if ( + checkpoint_callback + and pages_since_checkpoint >= CHECKPOINT_INTERVAL_PAGES + and extreme_mod_ts + ): + checkpoint_callback(extreme_mod_ts) + pages_since_checkpoint = 0 + + if ts_when_max_reached is not None and extreme_mod_ts != ts_when_max_reached: + log.info( + f"{entity}: timestamp changed {ts_when_max_reached} → {extreme_mod_ts} " + f"after {pages_fetched} pages — bulk import cleared" + ) + + finished = is_exhausted or (total_pages is not None and page > total_pages) + elapsed = time.time() - fetch_start + if phase_label: + log.info( + f"{entity}: {phase_label} complete — {total_records} records, {pages_fetched} pages" + ) + else: + avg_ms = round(total_api_ms / pages_fetched) if pages_fetched else 0 + rps = round(total_records / elapsed, 1) if elapsed else 0 + log.info( + f"Fetch complete for {entity}: {total_records} records in {round(elapsed, 1)}s " + f"({rps} rec/s) — {pages_fetched} pages, page_size={page_size}, " + f"avg {avg_ms}ms/page, max {max_page_ms}ms/page, finished={finished}" + ) + return total_records, extreme_mod_ts, finished diff --git a/wms_oracle/backfill.py b/wms_oracle/backfill.py new file mode 100644 index 0000000..6f5997d --- /dev/null +++ b/wms_oracle/backfill.py @@ -0,0 +1,247 @@ +"""Historical backfill for Oracle WMS entities. + +Fetches records in DESC order (newest first) within rolling 30-day windows so the +destination is populated with recent data as quickly as possible. + +Pagination strategy: + - ordering="-mod_ts,id" for stable pagination within same-timestamp groups + - Full pages: advance the page offset within the current window + - Past max_pages: keep fetching until page_max_ts < prev_page_min_ts, which guarantees + the entire same-timestamp group at the boundary has been consumed before checkpointing + - Partial page: window exhausted — slide cursor back BACKFILL_WINDOW_DAYS and reset + - Empty window: slide back and checkpoint; stop after BACKFILL_MAX_EMPTY_WINDOWS consecutive +""" + +import requests +from datetime import datetime, timedelta +from typing import Optional, Tuple + +from fivetran_connector_sdk import Logging as log + +from api import make_api_request +from utils import ( + MIN_PAGE_SIZE, + BACKFILL_WINDOW_DAYS, + BACKFILL_MAX_EMPTY_WINDOWS, + normalize_timestamp_to_oracle_format, +) + + +def run_backfill_phase( + base_url: str, + username: str, + password: str, + entity: str, + backfill_cursor: Optional[str], + max_pages: int, + page_size: int, + handle_records, + checkpoint_fn, + session: requests.Session, +) -> Tuple[int, Optional[str], bool]: + """ + Run the historical backfill for one entity. + + Args: + backfill_cursor: Upper bound (mod_ts__lt) for the current window. None on first sync. + max_pages: Soft page limit per sync. Fetching continues past this until the + timestamp group at the boundary is fully consumed. + handle_records: Callable(records: list) — called for each page of results. + checkpoint_fn: Callable(cursor_dt: datetime) — saves backfill progress to state. + + Returns: + (total_records, new_backfill_cursor, finished) + new_backfill_cursor: None if backfill completed, otherwise the new window anchor. + finished: True if all historical data has been consumed. + + Raises: + OrderingNotSupportedError: if Oracle returns 400 for -mod_ts ordering. + Caller should fall back to an unordered full scan. + """ + if backfill_cursor is None: + log.info(f"Starting historical backfill for {entity} (DESC, offset pagination)") + else: + log.info( + f"Resuming backfill for {entity}: window cursor={backfill_cursor} " + f"(DESC, offset pagination)" + ) + + cursor_dt: Optional[datetime] = ( + datetime.fromisoformat(backfill_cursor) if backfill_cursor else None + ) + bf_page = 1 + bf_pages_fetched = 0 + bf_records = 0 + bf_pages_since_log = 0 + bf_min_mod_ts_seen = None + bf_prev_page_min_ts = None + bf_page_size = page_size + bf_finished = False + bf_consecutive_empty = 0 + bf_cursor_rollback = None # set on timeout when prev page data exists + + while True: + # ── Cursor rollback: a mid-window timeout occurred on a previous pass ── + # Pages 1..N-1 were already fetched. Roll the window cursor back to the + # oldest mod_ts seen so far so we restart from a stable Oracle offset. + if bf_cursor_rollback is not None: + cursor_dt = datetime.fromisoformat(bf_cursor_rollback) + log.warning( + f"{entity}: backfill rolling back cursor to {bf_cursor_rollback} " + f"after timeout, restarting from page 1" + ) + bf_cursor_rollback = None + bf_page = 1 + bf_prev_page_min_ts = None + + cursor_str = ( + normalize_timestamp_to_oracle_format(cursor_dt.isoformat()) if cursor_dt else None + ) + lower_dt = cursor_dt - timedelta(days=BACKFILL_WINDOW_DAYS) if cursor_dt else None + lower_str = ( + normalize_timestamp_to_oracle_format(lower_dt.isoformat()) if lower_dt else None + ) + + # Adaptive page-size retry on timeout + while True: + try: + response_data = make_api_request( + base_url, + username, + password, + entity, + page=bf_page, + mod_ts_filter=lower_str, + mod_ts_lt_filter=cursor_str, + ordering="-mod_ts,id", + page_size=bf_page_size, + session=session, + ) + break + except requests.exceptions.Timeout: + if bf_page_size <= MIN_PAGE_SIZE: + log.error( + f"{entity}: backfill page {bf_page} timed out at minimum " + f"page_size={bf_page_size}, giving up" + ) + raise + if bf_prev_page_min_ts is not None: + # Mid-window timeout: pages 1..N-1 already fetched and safe. + # Roll back the cursor to bf_prev_page_min_ts and restart + # from page 1 to avoid Oracle pagination instability at the + # current offset boundary. + # Round UP to the next whole second when sub-second precision + # is present: Oracle truncates mod_ts__lt to seconds, so + # mod_ts__lt=02:25:50 would exclude records at 02:25:50.440928. + _prev_dt = datetime.fromisoformat(bf_prev_page_min_ts) + if _prev_dt.microsecond: + _prev_dt = _prev_dt.replace(microsecond=0) + timedelta(seconds=1) + bf_cursor_rollback = _prev_dt.isoformat() + bf_page_size = max(bf_page_size // 2, MIN_PAGE_SIZE) + log.warning( + f"{entity}: backfill page timed out at page {bf_page} " + f"(page_size={bf_page_size * 2}→{bf_page_size}); " + f"will roll back cursor to {bf_cursor_rollback}" + ) + break # exit inner retry loop; outer loop handles the rollback + else: + # First-page timeout: no data fetched yet, just shrink page size + old_offset = (bf_page - 1) * bf_page_size + bf_page_size = max(bf_page_size // 2, MIN_PAGE_SIZE) + bf_page = (old_offset // bf_page_size) + 1 + log.warning( + f"{entity}: backfill page timed out, retrying at page {bf_page} " + f"with page_size={bf_page_size}" + ) + + # If a rollback was triggered in the inner loop, restart the outer loop + if bf_cursor_rollback is not None: + continue + + records = response_data.get("results", []) + bf_pages_fetched += 1 + bf_pages_since_log += 1 + + # ── Empty window: slide back one period ────────────────────────────── + if not records: + bf_consecutive_empty += 1 + if bf_consecutive_empty >= BACKFILL_MAX_EMPTY_WINDOWS: + log.info( + f"{entity}: {BACKFILL_MAX_EMPTY_WINDOWS} consecutive empty windows " + f"— backfill complete" + ) + bf_finished = True + break + log.info( + f"{entity}: empty window — sliding back {BACKFILL_WINDOW_DAYS} days " + f"(empty #{bf_consecutive_empty})" + ) + cursor_dt = lower_dt + bf_page = 1 + bf_prev_page_min_ts = None + bf_pages_since_log = 0 + if cursor_dt: + checkpoint_fn(cursor_dt) + if bf_pages_fetched >= max_pages: + break + continue + + bf_consecutive_empty = 0 + handle_records(records) + bf_records += len(records) + + page_min_ts = min((r["mod_ts"] for r in records if r.get("mod_ts")), default=None) + page_max_ts = max((r["mod_ts"] for r in records if r.get("mod_ts")), default=None) + + if page_min_ts and (bf_min_mod_ts_seen is None or page_min_ts < bf_min_mod_ts_seen): + bf_min_mod_ts_seen = page_min_ts + + # ── Partial page: window exhausted, slide cursor back ───────────────── + if len(records) < bf_page_size: + cursor_dt = lower_dt + bf_page = 1 + bf_prev_page_min_ts = None + bf_pages_since_log = 0 + if cursor_dt: + checkpoint_fn(cursor_dt) + if bf_pages_fetched >= max_pages: + break + continue + + # ── Full page: advance offset within the current window ─────────────── + bf_page += 1 + + # Past max_pages: keep fetching until the entire current timestamp group is consumed. + # Break only when this page's max_ts is strictly below the previous page's min_ts — + # that guarantees no same-ts records remain on unfetched pages. + if bf_pages_fetched == max_pages: + log.info( + f"{entity}: reached max_pages={max_pages} mid-timestamp group " + f"(current ts={page_max_ts}) — continuing until timestamp changes" + ) + if ( + bf_pages_fetched >= max_pages + and bf_prev_page_min_ts is not None + and page_max_ts is not None + and page_max_ts < bf_prev_page_min_ts + ): + cursor_dt = datetime.fromisoformat(bf_prev_page_min_ts) + checkpoint_fn(cursor_dt) + break + + bf_prev_page_min_ts = page_min_ts + + if bf_pages_since_log >= 10: + log.info( + f"{entity}: backfill progress — {bf_records:,} records, {bf_pages_fetched} pages, " + f"oldest mod_ts={bf_min_mod_ts_seen}" + ) + bf_pages_since_log = 0 + + log.info( + f"{entity}: backfill {'complete' if bf_finished else 'paused'} " + f"— {bf_records} records, {bf_pages_fetched} pages" + ) + + new_cursor = None if bf_finished else (cursor_dt.isoformat() if cursor_dt else None) + return bf_records, new_cursor, bf_finished diff --git a/wms_oracle/configuration.json b/wms_oracle/configuration.json new file mode 100644 index 0000000..988ba32 --- /dev/null +++ b/wms_oracle/configuration.json @@ -0,0 +1,9 @@ +{ + "base_url": "https://.wms.ocs.oraclecloud.com/", + "username": "", + "password": "", + "page_size": "1000", + "max_pages": "100", + "lookback_check_hours": "24", + "test_entities": "" +} diff --git a/wms_oracle/connector.py b/wms_oracle/connector.py new file mode 100644 index 0000000..cd817db --- /dev/null +++ b/wms_oracle/connector.py @@ -0,0 +1,696 @@ +"""Oracle WMS → Fivetran connector. + +Syncs warehouse management entities from the Oracle WMS REST API to a Fivetran destination. + +Design overview: + - Incremental sync: two-phase per entity + Phase 1 (mod_ts): cursor-advancement with ordering="mod_ts,id" (stable same-ts pagination) + Phase 2 (create_ts): catch-up for records created after the cursor with a backdated mod_ts + - Backfill: DESC offset pagination in rolling 30-day windows, newest data first, + ordering="-mod_ts,id" for stable same-ts pagination + - Entities with active backfill run in parallel (ThreadPoolExecutor) + - Incremental-only entities run sequentially (no parallelism benefit, avoids lock contention) + - Mid-sync backfill checkpoints preserve per-entity progress; final checkpoint clears the flag + +""" + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting data operations like upsert(), update(), delete(), and checkpoint() +from fivetran_connector_sdk import Operations as op + +import json +import requests +from datetime import datetime +from typing import Optional +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock + +from utils import ( + ORACLE_WMS_ENTITIES, + DEFAULT_PAGE_SIZE, + DEFAULT_MAX_PAGES, + MAX_CONCURRENT_ENTITIES, + OrderingNotSupportedError, + validate_configuration, + get_current_timestamp, + to_utc, +) +from api import check_entity_has_mod_ts, probe_entity_count, fetch_entity_data +from incremental import run_incremental_phase +from backfill import run_backfill_phase +from pre_sync_drift_check import run_pre_cursor_hourly_check, run_daily_counts + +# ── Schema ──────────────────────────────────────────────────────────────────── + + +def schema(configuration: dict): + """ + Define the schema function which lets you configure the schema your connector delivers. + See the technical reference documentation for more details on the schema function: + https://fivetran.com/docs/connector-sdk/technical-reference/connector-sdk-code/connector-sdk-methods#schema + Args: + configuration: a dictionary that holds the configuration settings for the connector. + """ + return [ + {"table": "allocation", "primary_key": ["id"]}, + {"table": "batch_number", "primary_key": ["id"]}, + {"table": "company", "primary_key": ["id"]}, + {"table": "container", "primary_key": ["id"]}, + {"table": "container_lock_xref", "primary_key": ["id"]}, + {"table": "facility", "primary_key": ["id"]}, + {"table": "history_activity", "primary_key": ["id"]}, + {"table": "ib_container", "primary_key": ["id"]}, + {"table": "ib_shipment", "primary_key": ["id"]}, + {"table": "ib_shipment_dtl", "primary_key": ["id"]}, + {"table": "inventory", "primary_key": ["id"]}, + {"table": "inventory_attribute", "primary_key": ["id"]}, + {"table": "inventory_lock", "primary_key": ["id"]}, + {"table": "inventory_status", "primary_key": ["id"]}, + {"table": "item", "primary_key": ["id"]}, + {"table": "item_metric", "primary_key": ["id"]}, + {"table": "location", "primary_key": ["id"]}, + {"table": "order_dtl", "primary_key": ["id"]}, + {"table": "order_hdr", "primary_key": ["id"]}, + {"table": "order_status", "primary_key": ["id"]}, + {"table": "order_type", "primary_key": ["id"]}, + {"table": "purchase_order_dtl", "primary_key": ["id"]}, + {"table": "purchase_order_hdr", "primary_key": ["id"]}, + {"table": "purchase_order_status", "primary_key": ["id"]}, + {"table": "putaway_type", "primary_key": ["id"]}, + {"table": "vendor", "primary_key": ["id"]}, + {"table": "counts_by_day", "primary_key": ["table_name", "mod_ts_day", "batch_id"]}, + { + "table": "pre_cursor_hourly_counts", + "primary_key": ["table_name", "hour_start", "batch_id"], + }, + ] + + +# ── Entity processing ───────────────────────────────────────────────────────── + + +def process_entity( + base_url: str, + username: str, + password: str, + entity: str, + incremental_cursor: Optional[str], + backfill_cursor: Optional[str], + sync_start_time: str, + entity_cursors_live: dict, + entity_backfill_cursors_snapshot: dict, + entity_mod_ts_support_snapshot: dict, + in_progress_backfill_cursors: dict, + lock: Lock, + has_mod_ts: bool = True, + page_size: int = DEFAULT_PAGE_SIZE, + max_pages: Optional[int] = None, + run_incremental: bool = True, + run_backfill: bool = True, +) -> dict: + """ + Sync a single entity. run_incremental / run_backfill flags allow the caller + to run only one phase (e.g. incremental sequentially, then backfill in parallel). + Returns a result dict consumed by record_result(). + """ + entity_start = time.time() + try: + log.info(f"Processing entity: {entity}") + + total_records = 0 + new_backfill_cursor = None + backfill_ran = False + incremental_max_mod_ts = None + + def handle_records(records: list): + for record in records: + # Upsert each record into the destination table, inserting or updating as needed. + op.upsert(table=entity, data=record) + + def checkpoint_incremental(cursor_dt: datetime): + """Checkpoint incremental progress when cursor advances to a new timestamp.""" + cursor_str = to_utc(cursor_dt.isoformat()) + with lock: + entity_cursors_live[entity] = cursor_str + # Save progress so the sync can resume from this cursor if interrupted. + op.checkpoint( + { + "entity_cursors": dict(entity_cursors_live), + "entity_backfill_cursors": { + **entity_backfill_cursors_snapshot, + **dict(in_progress_backfill_cursors), + }, + "entity_mod_ts_support": dict(entity_mod_ts_support_snapshot), + "sync_in_progress": True, + } + ) + log.info(f"Incremental checkpoint for {entity}: cursor={cursor_str}") + + def checkpoint_backfill(cursor_dt: datetime): + """Checkpoint backfill progress to state. Thread-safe via shared lock.""" + cursor_str = to_utc(cursor_dt.isoformat()) + with lock: + in_progress_backfill_cursors[entity] = cursor_str + # Save backfill progress so the window cursor is preserved + # if the sync is interrupted. + op.checkpoint( + { + "entity_cursors": dict(entity_cursors_live), + "entity_backfill_cursors": { + **entity_backfill_cursors_snapshot, + **dict(in_progress_backfill_cursors), + }, + "entity_mod_ts_support": dict(entity_mod_ts_support_snapshot), + "sync_in_progress": True, + } + ) + log.info(f"Backfill checkpoint for {entity}: cursor={cursor_str}") + + with requests.Session() as session: + if not has_mod_ts: + # ── Full sync (no mod_ts support) ──────────────────────────── + log.info( + f"Full sync for {entity} " + f"(no mod_ts support, existing records _fivetran_deleted = true)" + ) + # Truncate soft-deletes all existing rows before the full re-scan + # so removed records are marked deleted. + op.truncate(table=entity) + count, _, _ = fetch_entity_data( + base_url, + username, + password, + entity, + page_size=page_size, + records_callback=handle_records, + session=session, + ) + total_records += count + + else: + # ── Phase 1: Incremental ───────────────────────────────────── + if run_incremental and incremental_cursor: + inc_count, incremental_max_mod_ts = run_incremental_phase( + base_url, + username, + password, + entity, + cursor=incremental_cursor, + sync_start_time=sync_start_time, + page_size=page_size, + handle_records=handle_records, + session=session, + checkpoint_fn=checkpoint_incremental, + ) + total_records += inc_count + + # ── Phase 2: Backfill ──────────────────────────────────────── + is_first_sync = not incremental_cursor and backfill_cursor is None + ongoing_backfill = backfill_cursor is not None + + if run_backfill and (is_first_sync or ongoing_backfill): + backfill_ran = True + try: + # On first sync backfill_cursor is None (no prior state). Seed it at + # sync_start_time so the first window starts at the current time and + # walks cleanly backwards — prevents a gap between the backfill cursor + # and the incremental cursor. + initial_backfill_cursor = ( + backfill_cursor if backfill_cursor is not None else sync_start_time + ) + bf_count, new_backfill_cursor, _ = run_backfill_phase( + base_url, + username, + password, + entity, + backfill_cursor=initial_backfill_cursor, + max_pages=max_pages, + page_size=page_size, + handle_records=handle_records, + checkpoint_fn=checkpoint_backfill, + session=session, + ) + total_records += bf_count + except OrderingNotSupportedError: + log.warning( + f"{entity} does not support DESC ordering — falling back to full scan" + ) + count, _, _ = fetch_entity_data( + base_url, + username, + password, + entity, + page_size=page_size, + records_callback=handle_records, + session=session, + ) + total_records += count + new_backfill_cursor = None + + elapsed = round(time.time() - entity_start, 1) + return { + "entity": entity, + "success": True, + "record_count": total_records, + "elapsed_seconds": elapsed, + "error_msg": None, + "new_backfill_cursor": new_backfill_cursor, + "backfill_ran": backfill_ran, + "new_incremental_cursor": incremental_max_mod_ts, + "incremental_ran": run_incremental, + } + + except Exception as e: + elapsed = round(time.time() - entity_start, 1) + log.error(f"Failed to process entity {entity}: {e}") + return { + "entity": entity, + "success": False, + "record_count": 0, + "elapsed_seconds": elapsed, + "error_msg": str(e), + "new_backfill_cursor": backfill_cursor, # Preserve existing cursor on failure + "backfill_ran": False, + } + + +# ── Sync orchestration ──────────────────────────────────────────────────────── + + +def update(configuration: dict, state: dict): + """ + Entry point called by Fivetran on each sync. + + State structure: + entity_cursors: {entity: timestamp} — incremental cursor per entity + entity_backfill_cursors: {entity: timestamp} — backfill window anchor per entity; + absent once backfill is complete + entity_mod_ts_support: {entity: bool} — cached describe-endpoint results + sync_in_progress: bool — True in mid-sync checkpoints, False in final checkpoint + """ + sync_wall_start = time.time() + log.info("Oracle WMS Connector: Starting sync") + + validate_configuration(configuration) + + base_url = configuration.get("base_url") + username = configuration.get("username") + password = configuration.get("password") + page_size = int(configuration.get("page_size", DEFAULT_PAGE_SIZE)) + max_pages = int(configuration.get("max_pages", DEFAULT_MAX_PAGES)) + lookback_check_hours = int(configuration.get("lookback_check_hours", 24)) + test_entities_raw = configuration.get("test_entities") + test_entities = ( + [e.strip() for e in test_entities_raw.split(",")] if test_entities_raw else None + ) + + entity_cursors = state.get("entity_cursors", {}) + entity_backfill_cursors = state.get("entity_backfill_cursors", {}) + + sync_start_time = get_current_timestamp() + + entities_to_sync = [ + e for e in ORACLE_WMS_ENTITIES if test_entities is None or e in test_entities + ] + log.info( + f"Sync started at: {sync_start_time} | " + f"page_size={page_size}, max_pages={max_pages}, concurrency={MAX_CONCURRENT_ENTITIES}" + ) + log.info(f"Processing {len(entities_to_sync)} entities") + + # ── Discover mod_ts support (once per entity, cached in state) ──────────── + entity_mod_ts_support = state.get("entity_mod_ts_support", {}) + entities_to_check = [e for e in entities_to_sync if e not in entity_mod_ts_support] + if entities_to_check: + log.info(f"Checking mod_ts support for {len(entities_to_check)} entities in parallel") + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: + futures = { + executor.submit(check_entity_has_mod_ts, base_url, username, password, e): e + for e in entities_to_check + } + for future in as_completed(futures): + entity = futures[future] + entity_mod_ts_support[entity] = future.result() + support_str = "supports" if entity_mod_ts_support[entity] else "does not support" + sync_type = "incremental" if entity_mod_ts_support[entity] else "full" + log.info(f"Entity {entity} {support_str} mod_ts - will use {sync_type} sync") + + # ── Probe result counts to sort entities largest-first ──────────────────── + log.info(f"Probing {len(entities_to_sync)} entities to determine processing order…") + entity_counts = {} + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: + probe_futures = {} + for entity in entities_to_sync: + has_mod_ts = entity_mod_ts_support.get(entity, False) + incremental_cursor = entity_cursors.get(entity) if has_mod_ts else None + backfill_cursor = entity_backfill_cursors.get(entity) if has_mod_ts else None + + if has_mod_ts and (not incremental_cursor or backfill_cursor is not None): + probe_ordering, probe_lt, probe_gte = "-mod_ts", backfill_cursor, None + elif has_mod_ts and incremental_cursor: + probe_ordering, probe_lt, probe_gte = "mod_ts", None, incremental_cursor + else: + probe_ordering, probe_lt, probe_gte = None, None, None + + future = executor.submit( + probe_entity_count, + base_url, + username, + password, + entity, + probe_gte, + probe_lt, + probe_ordering, + ) + probe_futures[future] = entity + + for future in as_completed(probe_futures): + entity_counts[probe_futures[future]] = future.result() + + entities_to_sync.sort(key=lambda e: entity_counts.get(e, 0), reverse=True) + log.info( + "Processing order (by record count): " + + ", ".join(f"{e}({entity_counts.get(e, 0):,})" for e in entities_to_sync) + ) + + # ── Classify: incremental-only (sequential) vs backfill/full-scan (parallel) ── + incremental_only = [] + needs_backfill = [] + for entity in entities_to_sync: + has_mod_ts = entity_mod_ts_support.get(entity, False) + incremental_cursor = entity_cursors.get(entity) if has_mod_ts else None + backfill_cursor = entity_backfill_cursors.get(entity) if has_mod_ts else None + is_first_sync = has_mod_ts and not incremental_cursor and backfill_cursor is None + if has_mod_ts and incremental_cursor and backfill_cursor is None and not is_first_sync: + incremental_only.append(entity) + else: + needs_backfill.append(entity) + + log.info( + f"Execution plan: {len(incremental_only)} incremental-only (sequential), " + f"{len(needs_backfill)} need backfill/full-scan (parallel)" + ) + + # Shared state updated by worker threads during mid-sync backfill checkpoints + in_progress_backfill_cursors: dict = {} + lock = Lock() + new_entity_backfill_cursors: dict = {} + completed_entities: set = set() + entity_results: dict = {} + all_success = True + + def build_backfill_state() -> dict: + """Merge completed + in-progress + original cursors into a consistent snapshot.""" + with lock: + in_progress = dict(in_progress_backfill_cursors) + result = { + e: in_progress.get(e, cursor) + for e, cursor in entity_backfill_cursors.items() + if e not in completed_entities + } + result.update(new_entity_backfill_cursors) + return result + + def record_result(result: dict): + """Update cursors and checkpoint after an entity completes.""" + nonlocal all_success + entity = result["entity"] + has_mod_ts = entity_mod_ts_support.get(entity, False) + entity_results[entity] = result + completed_entities.add(entity) + + if result["success"]: + if has_mod_ts: + if result.get("incremental_ran", True): + # Only advance cursor when we actually observed records at a new timestamp. + # Never jump to sync_start_time on an empty window — an empty response could + # indicate a partial or silent API outage rather than a genuinely empty range. + new_inc_cursor = result.get("new_incremental_cursor") + if new_inc_cursor: + entity_cursors[entity] = to_utc(new_inc_cursor) + if result["backfill_ran"]: + if result["new_backfill_cursor"] is not None: + new_entity_backfill_cursors[entity] = result["new_backfill_cursor"] + else: + # Backfill complete — remove from new cursors. + # The incremental pass (Pass 1) may have already added this entity + # to new_entity_backfill_cursors to preserve the in-progress cursor; + # now that backfill is done we must explicitly drop it. + new_entity_backfill_cursors.pop(entity, None) + # Seed the incremental cursor if not already set — whether backfill + # completed or just made progress. Without this, records modified after + # the backfill window has passed them are never picked up: backfill won't + # revisit them and Phase 1 won't run without a cursor. + if not entity_cursors.get(entity): + entity_cursors[entity] = sync_start_time + log.info(f"{entity}: seeding incremental cursor at {sync_start_time}") + elif entity in entity_backfill_cursors: + # Incremental-only this sync, but backfill still ongoing — preserve cursor + new_entity_backfill_cursors[entity] = entity_backfill_cursors[entity] + else: + entity_cursors.pop(entity, None) + + # Checkpoint after each entity completes so partial sync progress is not lost. + op.checkpoint( + { + "entity_cursors": dict(entity_cursors), + "entity_backfill_cursors": build_backfill_state(), + "entity_mod_ts_support": entity_mod_ts_support, + "sync_in_progress": True, + } + ) + inc_cursor = entity_cursors.get(entity) + bf_cursor = new_entity_backfill_cursors.get(entity) + backfill_just_completed = ( + result.get("backfill_ran") and result.get("new_backfill_cursor") is None + ) + cursor_parts = [] + if inc_cursor: + cursor_parts.append(f"cursor={inc_cursor}") + if backfill_just_completed: + cursor_parts.append("backfill=complete") + elif bf_cursor: + cursor_parts.append(f"backfill={bf_cursor}") + if cursor_parts: + log.info(f"Checkpointed {entity}: {', '.join(cursor_parts)}") + else: + log.error(f"Failed to process {entity}: {result['error_msg']}") + all_success = False + preserved = in_progress_backfill_cursors.get( + entity, entity_backfill_cursors.get(entity) + ) + if preserved: + new_entity_backfill_cursors[entity] = preserved + + def submit_entity( + entity: str, run_incremental: bool = True, run_backfill: bool = True + ) -> dict: + """Build process_entity kwargs and dispatch.""" + has_mod_ts = entity_mod_ts_support.get(entity, False) + incremental_cursor = entity_cursors.get(entity) if has_mod_ts else None + backfill_cursor = entity_backfill_cursors.get(entity) if has_mod_ts else None + + return process_entity( + base_url, + username, + password, + entity, + incremental_cursor, + backfill_cursor, + sync_start_time, + entity_cursors, + dict(entity_backfill_cursors), + dict(entity_mod_ts_support), + in_progress_backfill_cursors, + lock, + has_mod_ts, + page_size, + max_pages, + run_incremental=run_incremental, + run_backfill=run_backfill, + ) + + try: + # ── Pre-pass: hourly drift check for entities with cursors ──────────── + # Probes counts for the 24 clock-aligned hours before each cursor. + # Re-pulls any hour whose count increased since the last sync. + entities_with_cursor = [ + e for e in entities_to_sync if entity_mod_ts_support.get(e) and entity_cursors.get(e) + ] + prev_hourly_counts = state.get("pre_cursor_hourly_counts", {}) + new_hourly_counts, repull_summary = run_pre_cursor_hourly_check( + base_url, + username, + password, + entities_with_cursor, + entity_cursors, + sync_start_time, + prev_hourly_counts, + page_size, + hours=lookback_check_hours, + ) + + run_daily_counts( + base_url, + username, + password, + entities_with_cursor, + sync_start_time, + ) + + # ── Pass 1: Sequential incremental for ALL entities with a cursor ───── + # Runs before any backfill so incremental phases are never concurrent, + # regardless of whether an entity also has an active backfill. + for entity in entities_with_cursor: + try: + record_result(submit_entity(entity, run_incremental=True, run_backfill=False)) + except Exception as e: + log.error(f"Error processing {entity} (incremental): {e}") + all_success = False + completed_entities.add(entity) + + # ── Pass 2: Parallel backfill / full-scan ───────────────────────────── + # Incremental already ran in pass 1 for entities that had a cursor. + if needs_backfill: + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: + futures = { + executor.submit(submit_entity, entity, False, True): entity + for entity in needs_backfill + } + for future in as_completed(futures): + entity = futures[future] + try: + record_result(future.result()) + except Exception as e: + log.error(f"Error processing {entity} (backfill): {e}") + all_success = False + completed_entities.add(entity) + + # ── Pass 3: Sequential incremental for entities with no backfill ────── + # Entities that finished their backfill before this sync and weren't + # covered by pass 1 (i.e. incremental_only entities not yet processed). + for entity in incremental_only: + if entity not in completed_entities: + try: + record_result(submit_entity(entity)) + except Exception as e: + log.error(f"Error processing {entity}: {e}") + all_success = False + completed_entities.add(entity) + + # ── Sync summary ────────────────────────────────────────────────────── + results_sorted = sorted(entity_results.values(), key=lambda r: r["entity"]) + total_records = sum(r.get("record_count", 0) for r in results_sorted) + sync_elapsed = round(time.time() - sync_wall_start, 1) + incremental_results = [ + r for r in results_sorted if entity_mod_ts_support.get(r["entity"], False) + ] + full_sync_results = [ + r for r in results_sorted if not entity_mod_ts_support.get(r["entity"], False) + ] + + log.info(f"--- Sync summary ({sync_elapsed}s wall, {total_records:,} total records) ---") + for r in incremental_results: + status = "✓" if r["success"] else "✗" + entity = r["entity"] + rec_str = f"{r.get('record_count', 0):,} records in {r.get('elapsed_seconds', '?')}s" + cursor = entity_cursors.get(entity) + backfill = new_entity_backfill_cursors.get(entity, "complete") + cursor_str = f"cursor={cursor}" if cursor else "" + bf_str = f"backfill={backfill}" if backfill else "" + detail = " | ".join(p for p in [cursor_str, bf_str] if p) + log.info(f" {status} {entity}: {rec_str} | {detail}") + log.info("--- End: Sync summary ---") + + if full_sync_results: + log.info("--- Full sync tables ---") + for r in full_sync_results: + status = "✓" if r["success"] else "✗" + rec_str = ( + f"{r.get('record_count', 0):,} records in {r.get('elapsed_seconds', '?')}s" + ) + log.info(f" {status} {r['entity']}: {rec_str}") + log.info("--- End: Full sync tables ---") + + completed_bf = [ + r["entity"] + for r in results_sorted + if r.get("success") and r.get("backfill_ran") and r.get("new_backfill_cursor") is None + ] + if completed_bf or new_entity_backfill_cursors: + log.info("--- Backfill summary ---") + if completed_bf: + log.info(f" Completed this sync: {', '.join(sorted(completed_bf))}") + if new_entity_backfill_cursors: + log.info(f" Still in progress ({len(new_entity_backfill_cursors)} entities):") + for e, cursor in sorted(new_entity_backfill_cursors.items(), key=lambda x: x[1]): + probe_total = entity_counts.get(e, 0) + records_this_sync = entity_results.get(e, {}).get("record_count", 0) + if probe_total > 0: + pct = round(records_this_sync / probe_total * 100, 1) + log.info( + f" {e}: reached {cursor} — fetched {pct}% " + f"of {probe_total:,} pending records this sync" + ) + else: + log.info(f" {e}: reached {cursor}") + log.info("--- End: Backfill summary ---") + + if repull_summary: + log.info("--- Repull Summary ---") + for level, msg in repull_summary: + if level == "warning": + log.warning(msg) + else: + log.info(msg) + log.info("--- End: Repull Summary ---") + + # Final checkpoint marks the sync as complete and persists hourly drift counts + # for the next run. + op.checkpoint( + { + "entity_cursors": entity_cursors, + "entity_backfill_cursors": new_entity_backfill_cursors, + "entity_mod_ts_support": entity_mod_ts_support, + "sync_in_progress": False, + "pre_cursor_hourly_counts": new_hourly_counts, + } + ) + + if not all_success: + log.error( + f"Sync completed with failures | page_size={page_size}, max_pages={max_pages}" + ) + raise RuntimeError( + "One or more entities failed; checkpointed partial progress for retry" + ) + else: + log.info( + f"Sync completed successfully | page_size={page_size}, " + f"max_pages={max_pages}, concurrency={MAX_CONCURRENT_ENTITIES}" + ) + + except RuntimeError: + raise + except Exception as e: + log.error(f"Oracle WMS Connector failed: {e}") + raise RuntimeError(f"Failed to sync Oracle WMS data: {e}") + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +# Create the connector object using the schema and update functions. +connector = Connector(update=update, schema=schema) + +if __name__ == "__main__": + # Open the configuration.json file and load its contents. + with open("configuration.json", "r") as f: + configuration = json.load(f) + # Test the connector locally using the debug method. + connector.debug(configuration=configuration) diff --git a/wms_oracle/incremental.py b/wms_oracle/incremental.py new file mode 100644 index 0000000..f0c1b69 --- /dev/null +++ b/wms_oracle/incremental.py @@ -0,0 +1,197 @@ +"""Incremental sync for Oracle WMS entities. + +Two-phase approach: + + Phase 1 (mod_ts) — cursor-advancement pagination (ordering="mod_ts,id"): + Fetches records where mod_ts__gte=cursor and mod_ts__lt=sync_start_time. + After each full page, if the page's max mod_ts is greater than the cursor, + advance the cursor and restart from page 1. For same-timestamp batches + (page max == cursor), increment the page offset instead — safe because + ordering="mod_ts,id" gives stable offsets within a same-ts group. + Stops on empty page or partial page. + + Phase 2 (create_ts) — backdated-record catch-up (create_ts__gte=cursor): + Fetches records where create_ts__gte=cursor and create_ts__lt=sync_start_time. + A separate request is needed because Oracle ANDs all query params; catching records + that have an old mod_ts but were created after the cursor requires OR semantics. + Upserts are idempotent so overlap with Phase 1 is harmless. + Phase 2's max(mod_ts) is discarded — the cursor is driven by Phase 1 only, + since backdated records must not push the cursor forward. +""" + +import time +import requests +from datetime import datetime, timezone +from typing import Optional, Tuple + +from fivetran_connector_sdk import Logging as log + +from api import make_api_request, fetch_entity_data +from utils import ( + MIN_PAGE_SIZE, + INCREMENTAL_CHECKPOINT_INTERVAL_SECONDS, + normalize_timestamp_to_oracle_format, + OrderingNotSupportedError, +) + + +def run_incremental_phase( + base_url: str, + username: str, + password: str, + entity: str, + cursor: str, + sync_start_time: str, + page_size: int, + handle_records, + session: requests.Session, + checkpoint_fn=None, +) -> Tuple[int, Optional[str]]: + """ + Run Phase 1 (mod_ts cursor-advancement) and Phase 2 (create_ts catch-up). + + Returns: + (total_records, incremental_max_mod_ts) + incremental_max_mod_ts is None if no records were returned in Phase 1. + """ + cursor_dt = datetime.fromisoformat(cursor) + limit_str = normalize_timestamp_to_oracle_format(sync_start_time) + cursor_utc_str = cursor_dt.astimezone(timezone.utc).isoformat(timespec="seconds") + sync_start_utc_str = ( + datetime.fromisoformat(sync_start_time.replace("Z", "+00:00")) + .astimezone(timezone.utc) + .isoformat(timespec="seconds") + ) + + total_records = 0 + incremental_max_mod_ts = None + page_size_1a = page_size + phase1a_page = 1 + phase1a_pages = 0 + phase1a_records = 0 + last_checkpoint_wall = time.monotonic() + + log.info( + f"Starting incremental Phase 1 (mod_ts) for {entity}: " + f"{cursor_utc_str} → {sync_start_utc_str} (cursor={cursor})" + ) + + # ── Phase 1 (mod_ts) ────────────────────────────────────────────────────── + while True: + cursor_str = normalize_timestamp_to_oracle_format(cursor_dt.isoformat()) + + # Adaptive page-size retry on timeout + while True: + try: + response_data = make_api_request( + base_url, + username, + password, + entity, + page=phase1a_page, + mod_ts_filter=cursor_str, + mod_ts_lt_filter=limit_str, + ordering="mod_ts,id", + page_size=page_size_1a, + session=session, + ) + break + except requests.exceptions.Timeout: + if page_size_1a <= MIN_PAGE_SIZE: + log.error( + f"{entity}: Phase 1 (mod_ts) page {phase1a_page} timed out at minimum " + f"page_size={page_size_1a}, giving up" + ) + raise + page_size_1a = max(page_size_1a // 2, MIN_PAGE_SIZE) + phase1a_page = 1 + log.warning( + f"{entity}: Phase 1 (mod_ts) page timed out, restarting from page 1 " + f"with page_size={page_size_1a}" + ) + + records = response_data.get("results", []) + phase1a_pages += 1 + + if not records: + break + + handle_records(records) + total_records += len(records) + phase1a_records += len(records) + + page_max_ts = max( + (r["mod_ts"] for r in records if r.get("mod_ts")), + default=None, + ) + if page_max_ts: + if incremental_max_mod_ts is None or page_max_ts > incremental_max_mod_ts: + incremental_max_mod_ts = page_max_ts + page_max_dt = datetime.fromisoformat(page_max_ts) + if page_max_dt > cursor_dt: + # New timestamps seen — restart from page 1 at the advanced cursor + cursor_dt = page_max_dt + phase1a_page = 1 + if checkpoint_fn and ( + time.monotonic() - last_checkpoint_wall + >= INCREMENTAL_CHECKPOINT_INTERVAL_SECONDS + ): + checkpoint_fn(cursor_dt) + last_checkpoint_wall = time.monotonic() + else: + # Same-timestamp batch — advance the page offset (stable with id tiebreaker) + phase1a_page += 1 + else: + phase1a_page += 1 + + if len(records) < page_size_1a: + break # Partial page: no more records in this window + + log.info( + f"{entity}: Phase 1 (mod_ts) complete — {phase1a_records} records, {phase1a_pages} pages" + ) + + # ── Phase 2 (create_ts) ─────────────────────────────────────────────────── + log.info( + f"Starting incremental Phase 2 (create_ts) for {entity}: " + f"{cursor_utc_str} → {sync_start_utc_str} (cursor={cursor})" + ) + try: + count_1b, _, _ = fetch_entity_data( + base_url, + username, + password, + entity, + create_ts_gte_filter=cursor, + create_ts_lt_filter=sync_start_time, + ordering="create_ts,id", + page_size=page_size, + records_callback=handle_records, + session=session, + phase_label="Phase 2 (create_ts)", + ) + total_records += count_1b + except OrderingNotSupportedError: + log.warning( + f"{entity}: Phase 2 (create_ts) ordering=create_ts,id not supported " + f"— retrying without ordering" + ) + try: + count_1b, _, _ = fetch_entity_data( + base_url, + username, + password, + entity, + create_ts_gte_filter=cursor, + page_size=page_size, + records_callback=handle_records, + session=session, + phase_label="Phase 2 (create_ts)", + ) + total_records += count_1b + except Exception as e: + log.warning(f"{entity}: Phase 2 (create_ts) skipped — {e}") + except Exception as e: + log.warning(f"{entity}: Phase 2 (create_ts) skipped — {e}") + + return total_records, incremental_max_mod_ts diff --git a/wms_oracle/pre_sync_drift_check.py b/wms_oracle/pre_sync_drift_check.py new file mode 100644 index 0000000..458f240 --- /dev/null +++ b/wms_oracle/pre_sync_drift_check.py @@ -0,0 +1,284 @@ +"""Pre-sync hourly drift check for Oracle WMS connector. + +Before each sync, probes the count for every clock-aligned hourly window in the +24 hours preceding each entity's cursor. Compares against counts saved from the +prior sync, and re-pulls any hour whose count increased (long-running transactions +that committed after the cursor advanced past their mod_ts window). + +Window design +───────────── +24 full clock-aligned windows: [cursor_aligned - i*h, cursor_aligned - (i-1)*h) + Keyed by absolute UTC hour_str. Comparable across syncs. Saved to state. + +1 partial window (if cursor is not exactly on the hour): + [cursor_aligned, cursor_dt) + Upserted to Snowflake for visibility but NEVER compared against previous state + (this window legitimately grows each sync as the cursor advances within the + current hour and would produce false positives if compared). + NOT saved to state. + +Blind spot: transactions delayed by less than ~60 minutes (sub-hour gap) are not +detected here — that range falls within the current-hour partial window. +""" + +import time +import requests +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone, timedelta +from typing import Dict, List, Tuple + +from fivetran_connector_sdk import Logging as log +from fivetran_connector_sdk import Operations as op + +from api import probe_entity_count, fetch_entity_data +from utils import MAX_CONCURRENT_ENTITIES + + +def run_pre_cursor_hourly_check( + base_url: str, + username: str, + password: str, + entities: List[str], + entity_cursors: Dict[str, str], + sync_start_time: str, + prev_hourly_counts: Dict[str, Dict[str, int]], + page_size: int, + hours: int = 24, +) -> Tuple[Dict[str, Dict[str, int]], List[Tuple[str, str]]]: + """ + Probe hourly counts for the 24 hours before each entity's cursor, detect + increases, re-pull and verify any triggered windows. + + Args: + entities: Entities that have an incremental cursor in state. + entity_cursors: {entity: cursor_str} — current state values. + sync_start_time: Used as batch_id for Snowflake rows. + prev_hourly_counts: {entity: {hour_str: count}} — saved from prior sync. + page_size: Page size used for re-pull pagination. + + Returns: + (new_counts, summary_lines) where: + new_counts: {entity: {hour_str: count}} — full windows only, saved to state. + summary_lines: [("info"|"warning", message)] — repull log lines to replay + at the end of the sync summary. Empty if no repulls triggered. + """ + hourly_check_start = time.time() + batch_id = sync_start_time + + # ── Build task list ─────────────────────────────────────────────────────── + # Each task: (entity, hour_str, gte, lt, is_partial) + tasks: List[Tuple[str, str, str, str, bool]] = [] + + for entity in entities: + cursor_str = entity_cursors[entity] + cursor_dt = datetime.fromisoformat(cursor_str.replace("Z", "+00:00")).astimezone( + timezone.utc + ) + cursor_aligned = cursor_dt.replace(minute=0, second=0, microsecond=0) + + # Full clock-aligned hourly windows (oldest → newest) + for hour_offset in range(hours, 0, -1): + window_start = cursor_aligned - timedelta(hours=hour_offset) + window_end = cursor_aligned - timedelta(hours=hour_offset - 1) + hour_str = window_start.strftime("%Y-%m-%dT%H:%M:%SZ") + gte = window_start.strftime("%Y-%m-%dT%H:%M:%SZ") + lt = window_end.strftime("%Y-%m-%dT%H:%M:%SZ") + tasks.append((entity, hour_str, gte, lt, False)) + + # 1 partial window: [cursor_aligned, cursor_dt) — only if sub-hour gap exists + if cursor_dt > cursor_aligned: + hour_str = cursor_aligned.strftime("%Y-%m-%dT%H:%M:%SZ") + gte = cursor_aligned.strftime("%Y-%m-%dT%H:%M:%SZ") + lt = cursor_dt.strftime("%Y-%m-%dT%H:%M:%SZ") + tasks.append((entity, hour_str, gte, lt, True)) + + if not tasks: + log.info("Pre-cursor hourly check: no entities with cursors, skipping") + return {}, [] + + # ── Probe counts in parallel ────────────────────────────────────────────── + new_counts: Dict[str, Dict[str, int]] = {} + hours_to_repull: List[Tuple[str, str, str, str, int, int]] = [] + summary_lines: List[Tuple[str, str]] = [] + + def fetch_count(task: Tuple[str, str, str, str, bool]): + entity, hour_str, gte, lt, is_partial = task + count = probe_entity_count( + base_url, + username, + password, + entity, + mod_ts_filter=gte, + mod_ts_lt_filter=lt, + ) + return entity, hour_str, gte, lt, is_partial, count + + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: + futures = {executor.submit(fetch_count, task): task for task in tasks} + for future in as_completed(futures): + entity, hour_str, gte, lt, is_partial, count = future.result() + + # Full windows: compare and save to state + if not is_partial: + prev_count = prev_hourly_counts.get(entity, {}).get(hour_str) + if prev_count is not None and count > prev_count: + log.warning( + f"Pre-cursor drift: {entity} {hour_str} count increased " + f"{prev_count} → {count}" + ) + hours_to_repull.append((entity, hour_str, gte, lt, prev_count, count)) + + if entity not in new_counts: + new_counts[entity] = {} + new_counts[entity][hour_str] = count + + # Upsert hourly count into the monitoring table for observability. + op.upsert( + "pre_cursor_hourly_counts", + { + "table_name": entity, + "hour_start": hour_str, + "record_count": count, + "batch_id": batch_id, + "is_partial": is_partial, + }, + ) + + # ── Re-pull triggered hours (sequential) ───────────────────────────────── + for entity, hour_str, gte, lt, prev_count, new_count in hours_to_repull: + drift_msg = ( + f"Pre-cursor drift: {entity} {hour_str} count increased {prev_count} → {new_count}" + ) + log.warning(drift_msg) + summary_lines.append(("warning", drift_msg)) + + repull_msg = ( + f"Re-pulling {entity} {hour_str} (count increased {prev_count} → {new_count})…" + ) + log.info(repull_msg) + summary_lines.append(("info", repull_msg)) + + with requests.Session() as session: + total_pulled, _, _ = fetch_entity_data( + base_url, + username, + password, + entity, + mod_ts_filter=gte, + mod_ts_lt_filter=lt, + ordering="mod_ts,id", + page_size=page_size, + records_callback=lambda records: [op.upsert(entity, r) for r in records], + session=session, + phase_label=f"pre-cursor re-pull {hour_str}", + ) + + complete_msg = f"{entity}: pre-cursor re-pull {hour_str} complete — {total_pulled} records" + log.info(complete_msg) + summary_lines.append(("info", complete_msg)) + + # Verify post-repull count matches what triggered the re-pull + verified_count = probe_entity_count( + base_url, + username, + password, + entity, + mod_ts_filter=gte, + mod_ts_lt_filter=lt, + ) + if verified_count == new_count: + verify_msg = ( + f"Verified: {entity} {hour_str} confirmed at {new_count} " + f"({total_pulled} records re-pulled)" + ) + log.info(verify_msg) + summary_lines.append(("info", verify_msg)) + else: + verify_msg = ( + f"Unverified: {entity} {hour_str} expected {new_count}, " + f"got {verified_count} — data may still be changing" + ) + log.warning(verify_msg) + summary_lines.append(("warning", verify_msg)) + + elapsed = round(time.time() - hourly_check_start, 1) + check_summary = ( + f"Pre-cursor hourly check: {len(tasks)} windows probed across {len(entities)} entities " + f"({hours}h lookback) in {elapsed}s | re-pulls triggered: {len(hours_to_repull)}" + ) + log.info(check_summary) + summary_lines.append(("info", check_summary)) + + return new_counts, summary_lines + + +def run_daily_counts( + base_url: str, + username: str, + password: str, + entities: List[str], + sync_start_time: str, + days: int = 30, +) -> None: + """ + Probe daily mod_ts counts for the last N calendar days for each incremental entity + and upsert to counts_by_day. Runs all probes in parallel. + + Args: + entities: Incremental entities (those with mod_ts support and a cursor). + sync_start_time: Used as batch_id and as the reference point for day alignment. + days: Number of calendar days to cover (default 30). + """ + if not entities: + return + + batch_id = sync_start_time + sync_dt = datetime.fromisoformat(sync_start_time.replace("Z", "+00:00")).astimezone( + timezone.utc + ) + today = sync_dt.replace(hour=0, minute=0, second=0, microsecond=0) + + # Build one task per (entity, calendar day) + tasks: List[tuple] = [] + for entity in entities: + for day_offset in range(days): + day_start = today - timedelta(days=day_offset) + day_end = day_start + timedelta(days=1) + day_str = day_start.strftime("%Y-%m-%d") + gte = day_start.strftime("%Y-%m-%dT%H:%M:%SZ") + lt = day_end.strftime("%Y-%m-%dT%H:%M:%SZ") + tasks.append((entity, day_str, gte, lt)) + + def fetch_count(task: tuple): + entity, day_str, gte, lt = task + count = probe_entity_count( + base_url, + username, + password, + entity, + mod_ts_filter=gte, + mod_ts_lt_filter=lt, + ) + return entity, day_str, count + + daily_counts_start = time.time() + with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_ENTITIES) as executor: + futures = {executor.submit(fetch_count, task): task for task in tasks} + for future in as_completed(futures): + entity, day_str, count = future.result() + # Upsert daily count into the monitoring table for observability. + op.upsert( + "counts_by_day", + { + "table_name": entity, + "mod_ts_day": day_str, + "record_count": count, + "batch_id": batch_id, + }, + ) + + elapsed = round(time.time() - daily_counts_start, 1) + log.info( + f"Daily counts: {len(tasks)} windows probed across {len(entities)} entities " + f"({days}d lookback) in {elapsed}s" + ) diff --git a/wms_oracle/utils.py b/wms_oracle/utils.py new file mode 100644 index 0000000..6a2c6c5 --- /dev/null +++ b/wms_oracle/utils.py @@ -0,0 +1,107 @@ +"""Constants, entity list, custom exceptions, and timestamp utilities.""" + +from datetime import datetime, timezone + +# ── Entity list ─────────────────────────────────────────────────────────────── + +ORACLE_WMS_ENTITIES = [ + "order_dtl", + "inventory", + "container", + "container_lock_xref", + "order_hdr", + "allocation", + "inventory_attribute", + "item", + "batch_number", + "purchase_order_dtl", + "ib_shipment", + "ib_shipment_dtl", + "purchase_order_hdr", + "inventory_lock", + "location", + "item_metric", + "ib_container", + "history_activity", + "putaway_type", + "order_type", + "purchase_order_status", + "order_status", + "inventory_status", + "vendor", + "company", + "facility", + # "inventory_history", # Uncomment if your Oracle WMS instance supports this entity +] + + +# ── Custom exceptions ───────────────────────────────────────────────────────── + + +class OrderingNotSupportedError(Exception): + """Raised when the API returns 400 for a request with an ordering parameter. + Indicates the entity does not support ordering on that field. Do not retry.""" + + pass + + +# ── Sync tuning constants ───────────────────────────────────────────────────── + +API_VERSION = "v10" +DEFAULT_PAGE_SIZE = 1000 +MIN_PAGE_SIZE = 25 +CHECKPOINT_INTERVAL_PAGES = 10 +MAX_RETRIES = 5 +INITIAL_BACKOFF_SECONDS = 1 +MAX_CONCURRENT_ENTITIES = 10 +DEFAULT_MAX_PAGES = 100 +BACKFILL_WINDOW_DAYS = 30 # Width of each backfill fetch window +BACKFILL_MAX_EMPTY_WINDOWS = 12 # Stop after this many consecutive empty windows (~1 year) +# Max wall-clock seconds between incremental checkpoints +INCREMENTAL_CHECKPOINT_INTERVAL_SECONDS = 600 + + +# ── Configuration validation ────────────────────────────────────────────────── + + +def validate_configuration(configuration: dict): + """Raise ValueError if any required configuration fields are missing or invalid.""" + for key in ["base_url", "username", "password"]: + if key not in configuration: + raise ValueError(f"Missing required configuration value: {key}") + + if not configuration.get("base_url", "").startswith("https://"): + raise ValueError("base_url must start with https://") + + try: + int(configuration.get("page_size", str(DEFAULT_PAGE_SIZE))) + except ValueError: + raise ValueError("page_size must be a valid integer") + + +# ── Timestamp utilities ─────────────────────────────────────────────────────── + + +def get_current_timestamp() -> str: + """Return the current UTC time as an ISO string with timezone offset.""" + return datetime.now(timezone.utc).isoformat(timespec="seconds") + + +def normalize_timestamp_to_oracle_format(timestamp_str: str) -> str: + """Round a timestamp to second precision, preserving timezone offset. + Oracle WMS rejects sub-second precision in query parameters. + """ + try: + dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + return dt.isoformat(timespec="seconds") + except (ValueError, AttributeError): + return timestamp_str + + +def to_utc(timestamp_str: str) -> str: + """Convert any ISO timestamp string to UTC with full microsecond precision.""" + try: + dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + return dt.astimezone(timezone.utc).isoformat() + except (ValueError, AttributeError): + return timestamp_str