diff --git a/README.md b/README.md index 289db64..0e88a28 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ For SDK installation and setup, visit the main [Fivetran Connector SDK repositor - **[amazon_video_central](https://github.com/fivetran/community_connectors/tree/main/amazon_video_central)** - Sync report data from Amazon Video Central API - **[awardco](https://github.com/fivetran/community_connectors/tree/main/awardco)** - Sync data from Awardco rewards platform - **[betterstack](https://github.com/fivetran/community_connectors/tree/main/betterstack)** - Sync uptime monitoring data from Better Stack +- **[callminer](https://github.com/fivetran/fivetran_csdk_connectors/tree/main/callminer)** - Sync CallMiner Bulk Export data using OAuth2 authentication, export job polling, archive extraction, and per-data-type incremental state tracking - **[checkly](https://github.com/fivetran/community_connectors/tree/main/checkly)** - Sync monitoring check data and analytics from Checkly - **[clerk](https://github.com/fivetran/community_connectors/tree/main/clerk)** - Sync user data from Clerk authentication - **[commonpaper](https://github.com/fivetran/community_connectors/tree/main/commonpaper)** - Sync agreement data from Common Paper diff --git a/callminer/README.md b/callminer/README.md new file mode 100644 index 0000000..363894f --- /dev/null +++ b/callminer/README.md @@ -0,0 +1,153 @@ +# CallMiner Connector Example + +## Connector overview + +The CallMiner connector for Fivetran uses the CallMiner Bulk Export API to create export jobs, poll job status, download completed export archives, extract nested compressed CSV files, and sync the exported records to your destination. + +## Requirements + +- [Supported Python versions](https://github.com/fivetran/fivetran_csdk_connectors/blob/main/README.md#requirements) +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started + +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +To initialize a new Connector SDK project using this connector as a starting point, run: + +```bash +fivetran init --template callminer +``` + +`fivetran init` initializes a new Connector SDK project by setting up the project structure, configuration files, and a connector you can run immediately with `fivetran debug`. For more information on `fivetran init`, refer to the [Connector SDK `init` documentation](https://fivetran.com/docs/connector-sdk/connector-development-and-configuration/connector-sdk-commands#fivetraninit). + +> Note: Ensure you have updated the `configuration.json` file with the necessary parameters before running `fivetran debug`. See the [Configuration file](#configuration-file) section for details on the required configuration parameters. + +## Features + +- OAuth2 client credentials authentication with automatic token refresh. +- Bulk export job creation, polling, download, and cleanup. +- Incremental sync windows using a configurable `initial_start_date` and `increment_days`. +- Recent sync optimization using CallMiner's `LastNHours` export option when state is close to current time. +- Per-data-type state tracking through the `data_types` state object. +- Pending job recovery through the `pending_job` state object when an export does not finish within the configured polling window. +- Nested archive handling for outer ZIP files with inner `.gz` or `.zip` CSV payloads. +- Parallel nested archive extraction with a configurable `max_threads` value. Fivetran SDK operations run on the main thread. +- Optional `max_records` and `test_job_id` settings for local testing. + +## Configuration file + +The connector requires the following configuration parameters in the `configuration.json` file. This configuration is uploaded to Fivetran and defines how the connector authenticates with CallMiner and selects Bulk Export data. + +```json +{ + "client_id": "", + "client_secret": "", + "initial_start_date": "", + "increment_days": "", + "max_threads": "", + "max_polls": "", + "email_recipients": "", + "data_types": "" +} +``` + +Required parameters: + +- `client_id`: CallMiner OAuth2 client ID. +- `client_secret`: CallMiner OAuth2 client secret. +- `initial_start_date`: Starting timestamp for the first sync, formatted as `YYYY-MM-DDTHH:MM:SS.000Z`. +- `data_types`: Comma-separated list of CallMiner Bulk Export data types to request. + +Optional parameters: + +- `increment_days`: Number of days per initial or catch-up sync period. The default is `10`. +- `email_recipients`: Comma-separated email addresses for CallMiner export job notifications. +- `max_records`: Maximum records to process per file for local testing. Omit this value for full syncs. +- `max_threads`: Maximum number of parallel archive extraction threads. The default is `8`; valid values are `1` through `16`. +- `max_polls`: Maximum number of job polling attempts. The default is `60`, with one poll per minute in normal sync mode. +- `test_job_id`: Existing CallMiner export job ID for local testing. When set, the connector skips job creation and processes the specified job. + +> Note: When submitting connector code as a [Community Connector](https://github.com/fivetran/fivetran_csdk_connectors/tree/main) in the open-source [Connector SDK repository](https://github.com/fivetran/fivetran_csdk_connectors/tree/main), ensure the `configuration.json` file has placeholder values. When adding the connector to your production repository, ensure that the `configuration.json` file is not checked into version control to protect sensitive information. + +## Requirements file + +This connector does not require a `requirements.txt` file. It uses Python standard libraries, the Fivetran Connector SDK, and the `requests` library, which are available in the Connector SDK runtime environment. + +> Note: [Some packages](https://fivetran.com/docs/connector-sdk/technical-reference#preinstalledpackages) are pre-installed in the Connector SDK runtime environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication + +The connector uses OAuth2 client credentials flow through the CallMiner identity provider. The `get_access_token` function posts the configured `client_id` and `client_secret` to the token endpoint and stores the returned bearer token for API calls. + +Before you can request a token from CallMiner's Identity Provider Service, you'll need to request a Client ID and Secret from `support@callminer.com`. + +Tokens are refreshed before expiration by `refresh_token_if_needed`, using a five-minute buffer to avoid using an expired token during long-running export jobs. + +## Bulk export workflow + +The connector syncs CallMiner data through Bulk Export jobs rather than direct row pagination. + +1. The `update` function validates and parses configuration. +2. The connector determines whether each configured data type should use an incremental date window or the `LastNHours` option. +3. Data types with the same sync strategy are grouped into one Bulk Export job. +4. The connector polls job history until the job is completed, failed, or times out. +5. Completed jobs are downloaded, extracted, and processed into destination tables. +6. The connector checkpoints state after each completed sync period. +7. Completed jobs are deleted after checkpointing. + +If a job times out, the connector stores the job details in `pending_job` and checkpoints state before raising an error. The next sync resumes polling the same job before continuing normal sync work. + +## Data handling + +The connector downloads each completed CallMiner export as an outer ZIP file. The outer ZIP can contain metadata JSON and one or more nested `.gz` or `.zip` files with CSV data. + +CSV files are processed as streams with `csv.DictReader`. Table names are derived from exported filenames by removing UUID prefixes and file extensions, then normalizing names to lowercase with underscores. Worker threads decompress nested files into temporary CSV files without calling Fivetran SDK operations. The main thread reads those temporary CSV files and delivers records with `op.upsert`. + +Nested files are extracted in parallel with `ThreadPoolExecutor`. Files are sorted by size before extraction so larger files start earlier. Materialized CSV files are then upserted on the main thread in archive order and removed after processing. + +## Error handling + +The connector implements targeted error handling across authentication, API requests, job polling, and file processing. + +- API retries: The `retry_on_500_error` decorator retries HTTP 500-level errors with exponential backoff. +- Request errors: API request failures are logged and raised with the original `requests` exception. +- Job failures: Failed export jobs raise `ValueError` and are not deleted, allowing manual inspection in CallMiner. +- Job timeouts: Timed-out jobs are saved in state and resumed on the next sync. +- File processing errors: Gzip, ZIP, CSV, encoding, and unexpected file errors are tracked and logged with error statistics. + +## Tables created + +The connector defines primary keys for known CallMiner export tables in the `schema` function. Columns are inferred from the CSV headers returned by CallMiner. + +| Table name | Primary key | +| ---------- | ----------- | +| `AI-SUMMARIES` | `contact_id` | +| `COMMENTS` | `comment_id` | +| `CONTACTS` | `id` | +| `CATEGORIES` | `contact_id`, `category_id`, `section_id` | +| `CATEGORY-COMPONENTS` | `contact_id`, `category_id`, `component_id`, `start_time` | +| `EVENTS-DELAY` | `contact_id`, `start_time`, `end_time` | +| `EVENTS-OVERTALK` | `contact_id`, `start_time`, `end_time` | +| `EVENTS-REDACTION` | `contact_id`, `start_time`, `end_time` | +| `EVENTS-SILENCE` | `contact_id`, `start_time`, `end_time` | +| `SCORES` | `contact_id`, `score_id` | +| `SCORE-INDICATORS` | `contact_id`, `score_id`, `score_component_id` | +| `TAGS` | `contact_id`, `tag_id` | +| `TRANSCRIPTS` | `contact_id`, `start_time` | + +## Additional files + +- `auth.py`: Handles OAuth2 token requests, token refresh, and retry behavior for 500-level API errors. +- `api_client.py`: Creates export jobs, retrieves job history, checks job status, and deletes completed jobs. +- `config.py`: Validates and parses connector configuration values. +- `file_processing.py`: Downloads export files, extracts nested archives, processes CSV streams, and tracks file processing errors. +- `state.py`: Reads and updates per-data-type sync state. +- `sync.py`: Determines sync strategy, orchestrates export jobs, polls job status, checkpoints progress, and resumes pending jobs. + +## Additional considerations + +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. diff --git a/callminer/api_client.py b/callminer/api_client.py new file mode 100644 index 0000000..ce7464c --- /dev/null +++ b/callminer/api_client.py @@ -0,0 +1,205 @@ +""" +CallMiner API client functions for job management. +""" + +import requests +from typing import Dict, Any +from fivetran_connector_sdk import Logging as log +from auth import retry_on_500_error + + +@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2) +def create_job( + bearer_token: str, + start_date: str = None, + end_date: str = None, + last_n_hours: int = None, + data_types: list = None, + email_recipients: list = None, +) -> Dict[str, Any]: + """ + Create a new export job for contacts. + + Args: + bearer_token: Bearer token for authentication + start_date: Start date in ISO format (e.g., "2025-09-01T00:00:00.000Z") + end_date: End date in ISO format (e.g., "2025-10-01T00:00:00.000Z") + last_n_hours: Number of hours to look back (alternative to date range) + data_types: List of data types to export (default: ["Contacts"]) + email_recipients: List of email addresses for notifications + + Returns: + Dictionary containing: + - job_id: The job ID + - create_date: The job creation date + - full_response: Complete API response + """ + url = "https://api.callminer.net/bulkexport/api/export/job" + + headers = {"Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json"} + + if data_types is None: + data_types = ["Contacts"] + + if email_recipients is None: + email_recipients = [] + + # Generate descriptive name based on data types + if len(data_types) == 1: + job_name = f"{data_types[0]} Export" + else: + job_name = f"Bulk Export ({len(data_types)} data types)" + + # Build duration based on whether we're using LastNHours or custom date range + if last_n_hours is not None: + # Use LastNHours for subsequent syncs + duration = { + "LastNDays": None, + "LastNHours": last_n_hours, + "TimeFrame": None, + "StartDate": None, + "EndDate": None, + "SearchMode": "NewAndUpdated", + } + log.info(f"Creating job for last {last_n_hours} hours") + else: + # Use custom date range for initial sync + duration = { + "LastNDays": None, + "LastNHours": None, + "TimeFrame": "Custom", + "StartDate": start_date, + "EndDate": end_date, + "SearchMode": "NewAndUpdated", + } + log.info(f"Creating job for date range: {start_date} to {end_date}") + + payload = { + "Name": job_name, + "Duration": duration, + "DataTypes": data_types, + "SearchFilters": [], + "NotificationMethod": "Email", + "EmailRecipients": email_recipients, + } + + try: + response = requests.post(url, headers=headers, json=payload, timeout=(30, 60)) + response.raise_for_status() + + job_data = response.json() + job_id = job_data.get("Id") + create_date = job_data.get("CreateDate") + + if not job_id: + log.error(f"No job ID in response: {job_data}") + raise ValueError("Failed to get job ID from response") + + log.info(f"Successfully created job with ID: {job_id}") + + return {"job_id": job_id, "create_date": create_date, "full_response": job_data} + + except requests.exceptions.RequestException as e: + log.error(f"Error creating job: {e}") + raise + + +@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2) +def get_jobs_history(bearer_token: str) -> list: + """ + Get the history of all jobs. + + Args: + bearer_token: Bearer token for authentication + + Returns: + List of job dictionaries containing job history + """ + url = "https://api.callminer.net/bulkexport/api/export/history" + + headers = {"Authorization": f"Bearer {bearer_token}"} + + try: + response = requests.get(url, headers=headers, timeout=(30, 60)) + response.raise_for_status() + + jobs = response.json() + + # If not a list, assume it's wrapped in an object + if not isinstance(jobs, list): + jobs = [] + + log.info(f"Retrieved {len(jobs)} jobs from history") + + return jobs + + except requests.exceptions.RequestException as e: + log.error(f"Error getting jobs history: {e}") + raise + + +@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2) +def delete_job(job_id: str, bearer_token: str) -> None: + """ + Delete an export job. + + Args: + job_id: The job ID to delete + bearer_token: Bearer token for authentication + """ + url = f"https://api.callminer.net/bulkexport/api/export/job/{job_id}" + + headers = {"Authorization": f"Bearer {bearer_token}"} + + log.info(f"Deleting job: {job_id}") + + try: + response = requests.delete(url, headers=headers, timeout=(30, 60)) + response.raise_for_status() + + log.info(f"Successfully deleted job: {job_id}") + + except requests.exceptions.RequestException as e: + log.error(f"Error deleting job {job_id}: {e}") + # Don't raise - deletion failure shouldn't fail the sync + # Job will eventually expire on CallMiner's side + + +def check_job_status( + job_id: str, bearer_token: str = None, jobs_history: list = None +) -> Dict[str, Any]: + """ + Check the status of a job by searching the jobs history. + + Args: + job_id: The ID of the job to check (matches ExportJobId in history) + bearer_token: Bearer token for authentication + (optional if jobs_history provided) + jobs_history: Pre-fetched jobs history (optional, fetches if not provided) + + Returns: + Dictionary containing job status information with keys: + - status: Job status (e.g., "Completed", "Processing", "Failed") + - download_endpoint: Download endpoint if job is completed (optional) + - found: Boolean indicating if job was found in history + """ + if jobs_history is None: + if bearer_token is None: + raise ValueError("Either jobs_history or bearer_token must be provided") + jobs_history = get_jobs_history(bearer_token) + + for job in jobs_history: + export_job_id = job.get("ExportJobId") + + if export_job_id == job_id: + status = job.get("Status") + download_endpoint = job.get("DownloadEndpoint") + + result = {"status": status, "found": True, "job_data": job} + + if download_endpoint: + result["download_endpoint"] = download_endpoint + + return result + + return {"status": None, "found": False, "job_data": None} diff --git a/callminer/auth.py b/callminer/auth.py new file mode 100644 index 0000000..e02dfdd --- /dev/null +++ b/callminer/auth.py @@ -0,0 +1,160 @@ +""" +Authentication and token management for CallMiner API. +""" + +import requests +import time +from typing import Tuple, Callable +from datetime import datetime, timedelta +from functools import wraps +from fivetran_connector_sdk import Logging as log + + +def retry_on_500_error(max_retries: int = 3, initial_delay: int = 1, backoff_factor: int = 2): + """ + Decorator to retry API calls on 500-level errors with exponential backoff. + + Args: + max_retries: Maximum number of retry attempts (default: 3) + initial_delay: Initial delay in seconds before first retry (default: 1) + backoff_factor: Multiplier for exponential backoff (default: 2) + + Returns: + Decorated function with retry logic + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + last_exception = None + + for attempt in range(max_retries + 1): + try: + return func(*args, **kwargs) + + except requests.exceptions.HTTPError as e: + last_exception = e + + # Check if it's a 500-level error + if e.response is not None and e.response.status_code >= 500: + if attempt < max_retries: + # Calculate delay with exponential backoff + delay = initial_delay * (backoff_factor**attempt) + + log.warning( + f"HTTP {e.response.status_code} error in " + f"{func.__name__}. Retrying in {delay} seconds " + f"(attempt {attempt + 1}/{max_retries})..." + ) + + time.sleep(delay) + continue + else: + log.error( + f"HTTP {e.response.status_code} error in " + f"{func.__name__}. Max retries " + f"({max_retries}) exceeded." + ) + + # Re-raise if not a 500 error or max retries exceeded + raise + + except requests.exceptions.RequestException as e: + # For non-HTTP errors (timeout, connection, etc), don't retry + log.error(f"Request exception in {func.__name__}: {e}") + raise + + # If we get here, we've exhausted retries + if last_exception: + raise last_exception + + return wrapper + + return decorator + + +@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2) +def get_access_token(client_id: str, client_secret: str) -> Tuple[str, int]: + """ + Get an access token using OAuth2 client credentials flow. + + Args: + client_id: Client ID for authentication + client_secret: Client secret for authentication + + Returns: + Tuple of (access_token, expires_in_seconds) + """ + url = "https://idp.callminer.net/connect/token" + + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + data = { + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials", + } + + log.info("Requesting access token") + + try: + response = requests.post(url, headers=headers, data=data, timeout=(30, 60), verify=True) + response.raise_for_status() + + token_data = response.json() + access_token = token_data.get("access_token") + expires_in = token_data.get("expires_in", 3600) + + if not access_token: + log.error("No access_token in response") + raise ValueError("Failed to obtain access token") + + log.info("Successfully obtained access token") + return access_token, expires_in + + except requests.exceptions.RequestException as e: + log.error(f"Error obtaining access token: {e}") + raise + + +def get_token(client_id: str, client_secret: str) -> Tuple[str, datetime]: + """ + Get a fresh access token and its expiration time. + + Args: + client_id: Client ID for authentication + client_secret: Client secret for authentication + + Returns: + Tuple of (bearer_token, expiration_datetime) + """ + log.info("Requesting new access token") + bearer_token, expires_in = get_access_token(client_id, client_secret) + + # Calculate expiration with 5 minute buffer + expires_at = datetime.utcnow() + timedelta(seconds=expires_in - 300) + log.info(f"Token expires at: {expires_at.strftime('%Y-%m-%dT%H:%M:%SZ')}") + + return bearer_token, expires_at + + +def refresh_token_if_needed( + client_id: str, client_secret: str, current_token: str, expires_at: datetime +) -> Tuple[str, datetime]: + """ + Check if token needs refresh and get new one if needed. + + Args: + client_id: Client ID for authentication + client_secret: Client secret for authentication + current_token: Current bearer token + expires_at: When current token expires + + Returns: + Tuple of (bearer_token, expiration_datetime) + """ + if datetime.utcnow() >= expires_at: + log.info("Token expired, refreshing...") + return get_token(client_id, client_secret) + else: + return current_token, expires_at diff --git a/callminer/config.py b/callminer/config.py new file mode 100644 index 0000000..618c6d0 --- /dev/null +++ b/callminer/config.py @@ -0,0 +1,138 @@ +""" +Configuration parsing and validation utilities. +""" + +from typing import Dict, Any +from fivetran_connector_sdk import Logging as log + + +def validate_configuration(configuration: Dict[str, Any]) -> None: + """ + Validate required configuration parameters. + + Args: + configuration: Configuration dictionary to validate + + Raises: + ValueError: If any required configuration is missing + """ + if not configuration.get("client_id"): + log.error("client_id not found in configuration") + raise ValueError("client_id is required in configuration") + + if not configuration.get("client_secret"): + log.error("client_secret not found in configuration") + raise ValueError("client_secret is required in configuration") + + if not configuration.get("initial_start_date"): + log.error("initial_start_date not found in configuration") + raise ValueError("initial_start_date is required in configuration") + + +def parse_configuration(configuration: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse and validate configuration values. + + Args: + configuration: Configuration dictionary + + Returns: + Dictionary with keys: + - client_id: Client ID string + - client_secret: Client secret string + - initial_start_date: ISO format date string + - max_records: Integer or None (no limit) + - increment_days: Integer (default 10) + - max_threads: Integer (default 8) + - max_polls: Integer (default 60) + - email_recipients: List of email strings + - data_types: List of data type strings + - test_job_id: String or None (optional, for testing mode) + """ + # Parse max_records (None means no limit) + max_records = None + max_records_str = configuration.get("max_records", "") + if max_records_str: + try: + max_records = int(max_records_str) + if max_records <= 0: + max_records = None + except ValueError: + log.warning(f"Invalid max_records value: {max_records_str}") + max_records = None + + # Parse increment_days (default to 10) + increment_days = 10 + increment_days_str = configuration.get("increment_days", "10") + if increment_days_str: + try: + increment_days = int(increment_days_str) + if increment_days <= 0: + log.warning( + f"Invalid increment_days value: {increment_days_str}, " f"using default: 10" + ) + increment_days = 10 + except ValueError: + log.warning( + f"Invalid increment_days value: {increment_days_str}, " f"using default: 10" + ) + increment_days = 10 + + # Parse max_threads (default to 8) + max_threads = 8 + max_threads_str = configuration.get("max_threads", "8") + if max_threads_str: + try: + max_threads = int(max_threads_str) + if max_threads <= 0 or max_threads > 16: + log.warning( + f"Invalid max_threads value: {max_threads_str}, " + f"using default: 8 (valid range: 1-16)" + ) + max_threads = 8 + except ValueError: + log.warning(f"Invalid max_threads value: {max_threads_str}, " f"using default: 8") + max_threads = 8 + + # Parse max_polls (default to 60) + max_polls = 60 + max_polls_str = configuration.get("max_polls", "60") + if max_polls_str: + try: + max_polls = int(max_polls_str) + if max_polls <= 0: + log.warning(f"Invalid max_polls value: {max_polls_str}, " f"using default: 60") + max_polls = 60 + except ValueError: + log.warning(f"Invalid max_polls value: {max_polls_str}, " f"using default: 60") + max_polls = 60 + + # Parse email recipients + email_recipients = [] + email_recipients_str = configuration.get("email_recipients", "") + if email_recipients_str: + email_recipients = [ + email.strip() for email in email_recipients_str.split(",") if email.strip() + ] + + # Parse data types + data_types_str = configuration.get("data_types", "Contacts") + data_types = [dt.strip() for dt in data_types_str.split(",") if dt.strip()] + + # Parse test_job_id (optional, for testing with existing jobs) + test_job_id = configuration.get("test_job_id", "").strip() + if test_job_id: + log.info(f"TESTING MODE: Using existing job ID: {test_job_id}") + + return { + "client_id": configuration.get("client_id"), + "client_secret": configuration.get("client_secret"), + "initial_start_date": configuration.get("initial_start_date"), + "max_records": max_records, + "increment_days": increment_days, + "max_threads": max_threads, + "max_polls": max_polls, + "email_recipients": email_recipients, + "data_types": data_types, + "test_job_id": test_job_id if test_job_id else None, + } diff --git a/callminer/configuration.json b/callminer/configuration.json new file mode 100644 index 0000000..f1d424b --- /dev/null +++ b/callminer/configuration.json @@ -0,0 +1,11 @@ +{ + "client_id": "", + "client_secret": "", + "initial_start_date": "", + "increment_days": "", + "max_threads": "", + "max_polls": "", + "email_recipients": "", + "data_types": "" +} + diff --git a/callminer/connector.py b/callminer/connector.py new file mode 100644 index 0000000..cb8c0cf --- /dev/null +++ b/callminer/connector.py @@ -0,0 +1,273 @@ +""" +CallMiner Connector for Fivetran SDK. + +Main entry point for the connector that orchestrates authentication, +job creation, polling, and data synchronization. +""" + +import json +from typing import Dict, Any +from datetime import datetime, timedelta + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like upsert(), update(), delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# Import from local modules +from auth import get_token +from config import validate_configuration, parse_configuration +from state import update_data_type_state +from sync import ( + determine_sync_strategy, + poll_and_process_single_job, + sync_with_last_n_hours, + sync_incremental_periods, +) +from api_client import delete_job + + +def schema(configuration: Dict[str, Any]): + """ + Define the schema function which lets you configure the schema your connector delivers. + See the technical reference documentation for more details on the schema function: + https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema + Args: + configuration: a dictionary that holds the configuration settings for the connector. + """ + return [ + {"table": "ai_summaries", "primary_key": ["contact_id"]}, + {"table": "comments", "primary_key": ["comment_id"]}, + {"table": "contacts", "primary_key": ["id"]}, + {"table": "categories", "primary_key": ["contact_id", "category_id", "section_id"]}, + { + "table": "category_components", + "primary_key": ["contact_id", "category_id", "component_id", "start_time"], + }, + {"table": "events_delay", "primary_key": ["contact_id", "start_time", "end_time"]}, + {"table": "events_overtalk", "primary_key": ["contact_id", "start_time", "end_time"]}, + {"table": "events_redaction", "primary_key": ["contact_id", "start_time", "end_time"]}, + {"table": "events_silence", "primary_key": ["contact_id", "start_time", "end_time"]}, + {"table": "scores", "primary_key": ["contact_id", "score_id"]}, + { + "table": "score_indicators", + "primary_key": ["contact_id", "score_id", "score_component_id"], + }, + {"table": "tags", "primary_key": ["contact_id", "tag_id"]}, + {"table": "transcripts", "primary_key": ["contact_id", "start_time"]}, + ] + + +def update(configuration: Dict[str, Any], state: Dict[str, Any]): + """ + Define the update function, which is a required function, and is called by Fivetran during each sync. + See the technical reference documentation for more details on the update function + https://fivetran.com/docs/connector-sdk/technical-reference/connector-sdk-code/connector-sdk-methods#update + Args: + configuration: A dictionary containing connection details + state: A dictionary containing state information from previous runs + The state dictionary is empty for the first sync or for any full re-sync + """ + log.warning("Example: API Connector : CallMiner Connector") + + # Validate configuration + validate_configuration(configuration) + + # Parse configuration values + config = parse_configuration(configuration) + + log.info(f"Processing data types: {', '.join(config['data_types'])}") + + # Get access token for this sync run + bearer_token, token_expires_at = get_token(config["client_id"], config["client_secret"]) + + # Check for pending job from previous run (resume if timed out) + if "pending_job" in state: + pending = state["pending_job"] + log.info("=" * 60) + log.info("RESUMING: Found pending job from previous run") + log.info(f"Job ID: {pending['job_id']}") + log.info(f"Data types: {pending['data_types']}") + log.info(f"Date range: {pending.get('start_date')} " f"to {pending.get('end_date')}") + log.info("=" * 60) + + try: + result = poll_and_process_single_job( + job_id=pending["job_id"], + data_types_str=pending["data_types"], + client_id=config["client_id"], + client_secret=config["client_secret"], + bearer_token=bearer_token, + token_expires_at=token_expires_at, + state=state, + start_date=pending.get("start_date"), + end_date=pending.get("end_date"), + max_records=config["max_records"], + max_threads=config["max_threads"], + max_polls=config["max_polls"], + poll_interval=60, + ) + bearer_token, token_expires_at, processed_job_id = result + + # Clear pending job on success + del state["pending_job"] + + # Update last_synced_date for each data type to the end_date + # Add 1 second to move to the next period start + if pending.get("end_date"): + # Handle format: 2025-11-03T23:59:59.000Z or 2025-11-03T23:59:59Z + end_date_str = pending["end_date"].replace(".000Z", "Z") + end_dt = datetime.strptime(end_date_str, "%Y-%m-%dT%H:%M:%SZ") + next_start = end_dt + timedelta(seconds=1) + next_start_str = next_start.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + data_type_list = pending["data_types"].split(",") + for dt in data_type_list: + update_data_type_state(state, dt.strip(), next_start_str) + log.info(f"Updated last_synced_date to {next_start_str} " f"for resumed job") + + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # You should checkpoint even if you are not using incremental sync, as it tells Fivetran it is safe to write to destination. + # For large datasets, checkpoint regularly (e.g., every N records) not only at the end. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connector-sdk/best-practices#optimizingperformancewhenhandlinglargedatasets). + op.checkpoint(state=state) + + # Delete job after checkpoint + delete_job(processed_job_id, bearer_token) + + log.info("Successfully resumed and completed pending job") + # DON'T return - continue with normal sync to catch up + + except TimeoutError: + # Job still not done, keep it in state and exit + log.error("Pending job still not complete after resuming") + raise + except Exception as e: + # Other error - clear pending job and let it fail + log.error(f"Error resuming pending job: {e}") + if "pending_job" in state: + del state["pending_job"] + raise + + # TESTING MODE: If test_job_id is provided, skip job creation + if config.get("test_job_id"): + log.info("=" * 60) + log.info("TESTING MODE: Using existing job ID") + log.info("=" * 60) + + # Poll and process the test job + result = poll_and_process_single_job( + job_id=config["test_job_id"], + data_types_str=",".join(config["data_types"]), + client_id=config["client_id"], + client_secret=config["client_secret"], + bearer_token=bearer_token, + token_expires_at=token_expires_at, + state=state, + max_records=config["max_records"], + max_threads=config["max_threads"], + max_polls=config["max_polls"], + poll_interval=30, + ) + bearer_token, token_expires_at, processed_job_id = result + + # Note: In testing mode, we don't delete the job + # so you can test repeatedly + log.info("Testing mode completed successfully " "(job not deleted for reuse)") + return + + # Calculate threshold: increment_days converted to hours + threshold_hours = config["increment_days"] * 24 + + # Group data types by sync strategy + # Key: (use_last_n_hours, start_timestamp, last_n_hours) + sync_groups = {} + + for data_type in config["data_types"]: + # Determine sync strategy for this data type + use_last_n_hours, current_start, last_n_hours = determine_sync_strategy( + state, data_type, threshold_hours, config["initial_start_date"] + ) + + # Create grouping key + if use_last_n_hours: + group_key = ("last_n_hours", None, last_n_hours) + else: + group_key = ("incremental", current_start.isoformat(), 0) + + if group_key not in sync_groups: + sync_groups[group_key] = { + "data_types": [], + "use_last_n_hours": use_last_n_hours, + "current_start": current_start, + "last_n_hours": last_n_hours, + } + + sync_groups[group_key]["data_types"].append(data_type) + + # Process each group + for group_key, group_info in sync_groups.items(): + data_types = group_info["data_types"] + log.info(f"Syncing batch of {len(data_types)} data type(s): " f"{', '.join(data_types)}") + + if group_info["use_last_n_hours"]: + # Recent sync - use LastNHours for efficiency + bearer_token, token_expires_at = sync_with_last_n_hours( + client_id=config["client_id"], + client_secret=config["client_secret"], + bearer_token=bearer_token, + token_expires_at=token_expires_at, + data_type=data_types, # Now accepts list + last_n_hours=group_info["last_n_hours"], + email_recipients=config["email_recipients"], + max_records=config["max_records"], + max_threads=config["max_threads"], + max_polls=config["max_polls"], + state=state, + ) + else: + # Initial sync or large gap - use incremental date range logic + bearer_token, token_expires_at = sync_incremental_periods( + client_id=config["client_id"], + client_secret=config["client_secret"], + bearer_token=bearer_token, + token_expires_at=token_expires_at, + data_type=data_types, # Now accepts list + current_start=group_info["current_start"], + increment_days=config["increment_days"], + email_recipients=config["email_recipients"], + max_records=config["max_records"], + max_threads=config["max_threads"], + max_polls=config["max_polls"], + state=state, + ) + + log.info(f"Completed sync for batch: {', '.join(data_types)}") + + +# Create the connector object using the schema and update functions +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# +# IMPORTANT: The recommended way to test your connector is using the Fivetran debug command: +# fivetran debug +# +# This local testing block is provided as a convenience for quick debugging during development, +# such as using IDE debug tools (breakpoints, step-through debugging, etc.). +# Note: This method is not called by Fivetran when executing your connector in production. +# Always test using 'fivetran debug' prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration) diff --git a/callminer/file_processing.py b/callminer/file_processing.py new file mode 100644 index 0000000..5f2d976 --- /dev/null +++ b/callminer/file_processing.py @@ -0,0 +1,546 @@ +""" +File download and processing functions for CallMiner exports. +""" + +import requests +import zipfile +import gzip +import json +import csv +import io +import os +import tempfile +import threading +from typing import Dict, Any, Tuple, List +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from fivetran_connector_sdk import Operations as op, Logging as log +from auth import retry_on_500_error + +# Thread-safe error statistics tracking +_ERROR_STATS = defaultdict(int) +_ERROR_STATS_LOCK = threading.Lock() + + +def _cleanup_materialized_csv_files(materialized_files: List[Dict[str, Any]]) -> None: + """ + Remove temporary CSV files created by worker threads. + + Args: + materialized_files: List of materialized CSV metadata dictionaries. + """ + for materialized_file in materialized_files: + file_path = materialized_file.get("file_path") + if not file_path: + continue + + try: + if os.path.exists(file_path): + os.remove(file_path) + except OSError as e: + log.warning(f"Unable to remove temporary file {file_path}: {e}") + + +def log_error_statistics() -> None: + """ + Log comprehensive error statistics collected during file processing. + This helps with monitoring and debugging connector behavior. + """ + if _ERROR_STATS: + log.warning("File Processing Error Statistics:") + with _ERROR_STATS_LOCK: + for error_type, count in _ERROR_STATS.items(): + log.warning(f" {error_type}: {count}") + else: + log.info("No file processing errors encountered") + + +def reset_error_statistics() -> None: + """ + Reset error statistics. Called at the start of each job processing. + """ + with _ERROR_STATS_LOCK: + _ERROR_STATS.clear() + + +@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2) +def download_and_stream_file(download_endpoint: str, bearer_token: str) -> io.BytesIO: + """ + Download a file from CallMiner Bulk Export API and return as stream. + + Args: + download_endpoint: The full download endpoint URL or download ID + bearer_token: Bearer token for authentication + + Returns: + io.BytesIO: File content as a stream + """ + # If just an ID is provided, construct the full URL + if not download_endpoint.startswith("http"): + url = f"https://api.callminer.net/bulkexport/api/download/" f"{download_endpoint}" + else: + url = download_endpoint + + headers = {"Authorization": f"Bearer {bearer_token}"} + + log.info(f"Downloading file from: {url}") + + try: + # Longer timeout for file downloads (30s connect, 10min read) + response = requests.get(url, headers=headers, stream=True, timeout=(30, 600)) + response.raise_for_status() + + try: + # Stream directly into BytesIO + file_stream = io.BytesIO() + total_size = 0 + + for chunk in response.iter_content(chunk_size=8192): + if chunk: + file_stream.write(chunk) + total_size += len(chunk) + + file_stream.seek(0) # Reset to beginning for reading + log.info(f"Successfully downloaded file, size: {total_size} bytes") + return file_stream + finally: + response.close() + + except requests.exceptions.RequestException as e: + log.error(f"Error downloading file: {e}") + raise + + +def parse_table_name_from_filename(filename: str) -> str: + """ + Extract and normalize table name from filename. + + Handles formats like: + - UUID_TableName.csv.gz -> "tablename" + - UUID_TableName.csv.zip -> "tablename" + - TableName.csv -> "tablename" + + Args: + filename: Filename to parse (may include UUID prefix and extensions) + + Returns: + Normalized table name in lowercase with underscores + """ + # Remove all possible extensions + base = ( + filename.replace(".csv.gz", "") + .replace(".csv.zip", "") + .replace(".gz", "") + .replace(".zip", "") + .replace(".csv", "") + ) + + # Split by underscore and skip UUID if present + parts = base.split("_") + if len(parts) > 1 and "-" in parts[0]: + # UUID_TableName format - skip UUID (first part) + return "_".join(parts[1:]).lower() + else: + # Simple filename - use as is + return base.lower().replace(" ", "_") + + +def process_csv_stream( + csv_reader: csv.DictReader, table_name: str, max_records: int = None +) -> int: + """ + Process CSV stream and upsert records. + + Args: + csv_reader: CSV DictReader instance + table_name: Target table name for upserting + max_records: Maximum records to process (None for unlimited) + + Returns: + Number of records processed + """ + record_count = 0 + log_interval = 1000000 # Log every million records + + for row in csv_reader: + if max_records and record_count >= max_records: + log.info(f"Reached limit of {max_records} records for {table_name}") + break + + # The 'upsert' operation is used to insert or update data in the destination table. + # The first argument is the name of the destination table. + # The second argument is a dictionary containing the record to be upserted. + op.upsert(table=table_name, data=row) + record_count += 1 + + # Log progress every million records + if record_count % log_interval == 0: + log.info(f"Processing {table_name}: {record_count:,} records processed") + + return record_count + + +def materialize_csv_stream( + csv_reader: csv.DictReader, + table_name: str, + source_name: str, + order_key: Tuple[int, int], + max_records: int = None, +) -> Dict[str, Any]: + """ + Write parsed CSV rows to a temporary CSV file without calling SDK operations. + + Args: + csv_reader: CSV DictReader instance + table_name: Target table name for later upserting + source_name: Source file name for logging + order_key: Stable sort key for main-thread upsert ordering + max_records: Maximum records to materialize (None for unlimited) + + Returns: + Dictionary with temporary file metadata and record count + """ + temp_file = tempfile.NamedTemporaryFile( + mode="w", + newline="", + encoding="utf-8", + delete=False, + prefix="callminer_", + suffix=".csv", + ) + record_count = 0 + + try: + fieldnames = csv_reader.fieldnames + if not fieldnames: + log.warning(f"No CSV headers found in {source_name}") + return { + "file_path": temp_file.name, + "table_name": table_name, + "source_name": source_name, + "record_count": record_count, + "order_key": order_key, + } + + writer = csv.DictWriter(temp_file, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + + for row in csv_reader: + if max_records and record_count >= max_records: + log.info(f"Reached limit of {max_records} records for {table_name}") + break + + writer.writerow(row) + record_count += 1 + + return { + "file_path": temp_file.name, + "table_name": table_name, + "source_name": source_name, + "record_count": record_count, + "order_key": order_key, + } + except Exception: + temp_file.close() + _cleanup_materialized_csv_files([{"file_path": temp_file.name}]) + raise + finally: + if not temp_file.closed: + temp_file.close() + + +def upsert_materialized_csv_file(materialized_file: Dict[str, Any]) -> int: + """ + Upsert rows from a materialized CSV file on the main thread. + + Args: + materialized_file: Temporary CSV metadata dictionary + + Returns: + Number of records upserted + """ + table_name = materialized_file["table_name"] + file_path = materialized_file["file_path"] + + with open(file_path, "r", newline="", encoding="utf-8") as csv_file: + csv_reader = csv.DictReader(csv_file) + return process_csv_stream(csv_reader, table_name) + + +def process_single_nested_file( + file_data: bytes, + filename: str, + table_name: str, + file_order: int, + max_records: int = None, +) -> Tuple[str, List[Dict[str, Any]]]: + """ + Extract a single gzip or zip file to temporary CSV files. + + Args: + file_data: Raw bytes of the file + filename: Filename for logging + table_name: Target table name + file_order: Stable order of the nested file in the outer zip + max_records: Maximum records to process (None for no limit) + + Returns: + Tuple of (filename, materialized CSV metadata list) + """ + materialized_files = [] + + try: + thread_name = threading.current_thread().name + log.info(f"[{thread_name}] Processing file: {filename} -> {table_name}") + + file_stream = io.BytesIO(file_data) + + # Handle .gz files + if filename.endswith(".gz"): + with gzip.open(file_stream, "rt", encoding="utf-8") as gzf: + csv_reader = csv.DictReader(gzf) + materialized_files.append( + materialize_csv_stream( + csv_reader, + table_name, + filename, + (file_order, 0), + max_records, + ) + ) + + # Handle .zip files + elif filename.endswith(".zip"): + with zipfile.ZipFile(file_stream) as inner_zip: + inner_files = inner_zip.namelist() + + # Process all CSV files in the nested zip + for inner_file_order, inner_file in enumerate(inner_files): + if not inner_file.endswith(".csv"): + continue + + csv_table_name = parse_table_name_from_filename(inner_file) + + with inner_zip.open(inner_file) as csvf: + with io.TextIOWrapper(csvf, encoding="utf-8") as text_stream: + csv_reader = csv.DictReader(text_stream) + materialized_files.append( + materialize_csv_stream( + csv_reader, + csv_table_name, + inner_file, + (file_order, inner_file_order), + max_records, + ) + ) + + thread_name = threading.current_thread().name + record_count = sum(file_info["record_count"] for file_info in materialized_files) + log.info(f"[{thread_name}] Completed {filename}: {record_count} records") + return filename, materialized_files + + except gzip.BadGzipFile as e: + thread_name = threading.current_thread().name + _cleanup_materialized_csv_files(materialized_files) + with _ERROR_STATS_LOCK: + _ERROR_STATS["gzip_decompression_errors"] += 1 + log.error(f"[{thread_name}] Gzip decompression error in {filename}: {e}") + raise + except zipfile.BadZipFile as e: + thread_name = threading.current_thread().name + _cleanup_materialized_csv_files(materialized_files) + with _ERROR_STATS_LOCK: + _ERROR_STATS["zip_decompression_errors"] += 1 + log.error(f"[{thread_name}] Zip decompression error in {filename}: {e}") + raise + except csv.Error as e: + thread_name = threading.current_thread().name + _cleanup_materialized_csv_files(materialized_files) + with _ERROR_STATS_LOCK: + _ERROR_STATS["csv_parsing_errors"] += 1 + log.error(f"[{thread_name}] CSV parsing error in {filename}: {e}") + raise + except UnicodeDecodeError as e: + thread_name = threading.current_thread().name + _cleanup_materialized_csv_files(materialized_files) + with _ERROR_STATS_LOCK: + _ERROR_STATS["encoding_errors"] += 1 + log.error(f"[{thread_name}] Encoding error in {filename}: {e}") + raise + except Exception as e: + thread_name = threading.current_thread().name + _cleanup_materialized_csv_files(materialized_files) + with _ERROR_STATS_LOCK: + _ERROR_STATS["unexpected_file_errors"] += 1 + log.error(f"[{thread_name}] Unexpected error processing {filename}: {e}") + raise + + +def process_multi_type_zip_file( + zip_stream: io.BytesIO, + download_id: str, + data_types_str: str, + state: Dict[str, Any], + max_records: int = None, + max_threads: int = 8, +) -> None: + """ + Process a zip file that may contain data for multiple data types. + Automatically detects table names from file names. + Uses multi-threading to extract nested files, then upserts on the main thread. + + Args: + zip_stream: Zip file content as BytesIO stream + download_id: ID of the downloaded file for logging + data_types_str: Comma-separated string of data types + state: State dictionary for checkpointing + max_records: Maximum records to process per file (None for no limit) + max_threads: Maximum number of threads for parallel extraction + (default: 8) + """ + # Reset error statistics for this job + reset_error_statistics() + + try: + # Open the outer zip file + with zipfile.ZipFile(zip_stream) as outer_zip: + file_list = outer_zip.namelist() + log.info(f"Files in outer zip: {file_list}") + + # Find JSON metadata file + json_file = None + nested_files = [] + + for filename in file_list: + if filename.endswith(".json"): + json_file = filename + elif filename.endswith(".zip") or filename.endswith(".gz"): + nested_files.append(filename) + + # Read metadata (optional) + if json_file: + with outer_zip.open(json_file) as jf: + metadata = json.load(jf) + if isinstance(metadata, dict): + metadata_keys = ", ".join(metadata.keys()) + log.info(f"Metadata loaded for file: {json_file} ({metadata_keys})") + else: + log.info(f"Metadata loaded for file: {json_file}") + + # If no nested files found, log error + if not nested_files: + log.error(f"No nested compressed files found in {download_id}") + return + + log.info( + f"Extracting {len(nested_files)} files with up to " f"{max_threads} worker threads" + ) + + # Extract all nested files into memory first + # (ZipFile objects aren't thread-safe) + file_tasks = [] + for file_order, nested_file in enumerate(nested_files): + table_name = parse_table_name_from_filename(nested_file) + with outer_zip.open(nested_file) as ncf: + file_data = ncf.read() + file_tasks.append( + { + "file_data": file_data, + "filename": nested_file, + "table_name": table_name, + "file_order": file_order, + "file_size": len(file_data), + } + ) + + # Sort by file size (largest first) to process large files early + # This prevents large files from blocking after small files finish + file_tasks.sort(key=lambda task: task["file_size"], reverse=True) + + if file_tasks: + largest_mb = file_tasks[0]["file_size"] / (1024 * 1024) + smallest_mb = file_tasks[-1]["file_size"] / (1024 * 1024) + log.info( + f"Extracting {len(file_tasks)} files using up to " + f"{max_threads} threads (largest: {largest_mb:.1f} MB, " + f"smallest: {smallest_mb:.1f} MB)" + ) + else: + log.info( + f"Extracting {len(file_tasks)} files using up to " f"{max_threads} threads" + ) + + # Extract/decompress files in parallel, but keep SDK operations on the main thread. + materialized_files = [] + total_records = 0 + errors = [] + + with ThreadPoolExecutor(max_workers=max_threads) as executor: + # Submit all tasks + future_to_file = { + executor.submit( + process_single_nested_file, + task["file_data"], + task["filename"], + task["table_name"], + task["file_order"], + max_records, + ): task["filename"] + for task in file_tasks + } + + # Collect results as they complete + for future in as_completed(future_to_file): + filename = future_to_file[future] + try: + result_filename, result_files = future.result() + materialized_files.extend(result_files) + extracted_records = sum( + file_info["record_count"] for file_info in result_files + ) + log.info( + f"Completed extraction for {result_filename}: " + f"{extracted_records} records" + ) + + except Exception as e: + error_msg = f"Failed to process {filename}: {e}" + log.error(error_msg) + errors.append(error_msg) + + if errors: + _cleanup_materialized_csv_files(materialized_files) + log.error( + f"Completed with {len(errors)} error(s). " + f"Total records processed: {total_records}" + ) + raise RuntimeError(f"Errors during parallel processing: {errors}") + + try: + for materialized_file in sorted( + materialized_files, key=lambda file_info: file_info["order_key"] + ): + record_count = upsert_materialized_csv_file(materialized_file) + total_records += record_count + log.info( + f"Completed upsert for {materialized_file['source_name']}: " + f"{record_count} records" + ) + finally: + _cleanup_materialized_csv_files(materialized_files) + + log.info(f"Successfully processed all files. " f"Total records: {total_records}") + + except zipfile.BadZipFile as e: + with _ERROR_STATS_LOCK: + _ERROR_STATS["outer_zip_errors"] += 1 + log.error(f"Invalid outer zip file: {e}") + raise + except Exception as e: + with _ERROR_STATS_LOCK: + _ERROR_STATS["outer_processing_errors"] += 1 + log.error(f"Error processing zip file: {e}") + raise + finally: + # Always log error statistics at the end + log_error_statistics() diff --git a/callminer/state.py b/callminer/state.py new file mode 100644 index 0000000..b34e713 --- /dev/null +++ b/callminer/state.py @@ -0,0 +1,38 @@ +""" +State management utilities for tracking sync progress. +""" + +from typing import Dict, Any + + +def get_data_type_state(state: Dict[str, Any], data_type: str) -> Dict[str, Any]: + """ + Get state for a specific data type. + + Args: + state: Overall state dictionary + data_type: Data type name (e.g., 'Contacts', 'Transcripts') + + Returns: + State dictionary for the specific data type + """ + if "data_types" not in state: + state["data_types"] = {} + + if data_type not in state["data_types"]: + state["data_types"][data_type] = {} + + return state["data_types"][data_type] + + +def update_data_type_state(state: Dict[str, Any], data_type: str, last_synced_date: str) -> None: + """ + Update the last synced date for a specific data type. + + Args: + state: Overall state dictionary + data_type: Data type name + last_synced_date: ISO format timestamp + """ + dt_state = get_data_type_state(state, data_type) + dt_state["last_synced_date"] = last_synced_date diff --git a/callminer/sync.py b/callminer/sync.py new file mode 100644 index 0000000..744be31 --- /dev/null +++ b/callminer/sync.py @@ -0,0 +1,445 @@ +""" +Sync orchestration and job polling logic for CallMiner exports. +""" + +import time +from typing import Dict, Any, Tuple +from datetime import datetime, timedelta +from fivetran_connector_sdk import Operations as op, Logging as log +from api_client import create_job, get_jobs_history, check_job_status, delete_job +from file_processing import download_and_stream_file, process_multi_type_zip_file +from state import get_data_type_state, update_data_type_state +from auth import refresh_token_if_needed + + +def determine_sync_strategy( + state: Dict[str, Any], data_type: str, threshold_hours: int, initial_start_date_str: str +) -> Tuple[bool, datetime, int]: + """ + Determine sync strategy based on state and time gap for a specific data type. + + Args: + state: Current state dictionary + data_type: Data type to check state for + threshold_hours: Threshold for using LastNHours vs date range + initial_start_date_str: Initial start date from configuration + + Returns: + Tuple of (use_last_n_hours, current_start, last_n_hours) + """ + current_time = datetime.utcnow() + dt_state = get_data_type_state(state, data_type) + + if dt_state.get("last_synced_date"): + last_synced = datetime.strptime(dt_state["last_synced_date"], "%Y-%m-%dT%H:%M:%S.%fZ") + + # Calculate hours since last sync + hours_diff = (current_time - last_synced).total_seconds() / 3600 + + # If gap is small enough, use LastNHours for efficiency + if hours_diff <= threshold_hours: + last_n_hours = int(hours_diff) + 1 # Round up to ensure overlap + + log.info( + f"[{data_type}] Recent sync detected: {hours_diff:.2f} hours " + f"since last sync ({dt_state['last_synced_date']}), " + f"using LastNHours={last_n_hours} " + f"(threshold: {threshold_hours} hours)" + ) + + return True, current_time, last_n_hours + else: + # Gap too large - use incremental date range logic + log.info( + f"[{data_type}] Large gap detected: {hours_diff:.2f} hours " + f"since last sync ({dt_state['last_synced_date']}), " + f"exceeds threshold of {threshold_hours} hours. " + f"Using incremental date range logic." + ) + return False, last_synced, 0 + else: + # No state - initial sync + log.info(f"[{data_type}] No previous sync detected. Starting initial sync.") + current_start = datetime.strptime(initial_start_date_str, "%Y-%m-%dT%H:%M:%S.%fZ") + return False, current_start, 0 + + +def handle_completed_job( + job_id: str, + data_types_str: str, + download_endpoint: str, + bearer_token: str, + state: Dict[str, Any], + max_records: int = None, + max_threads: int = 8, +): + """ + Download and process a completed job. + + Args: + job_id: Job ID + data_types_str: Comma-separated data types string + download_endpoint: Endpoint URL for downloading + bearer_token: Bearer token for authentication + state: State dictionary for checkpointing + max_records: Maximum records to process (None for no limit) + max_threads: Maximum number of threads for parallel processing + """ + log.info(f"Job completed, downloading file for: {data_types_str}") + + file_stream = download_and_stream_file(download_endpoint, bearer_token) + + try: + process_multi_type_zip_file( + file_stream, job_id, data_types_str, state, max_records, max_threads + ) + log.info(f"Sync completed successfully for: {data_types_str}") + finally: + file_stream.close() + + +def poll_and_process_single_job( + job_id: str, + data_types_str: str, + client_id: str, + client_secret: str, + bearer_token: str, + token_expires_at: datetime, + state: Dict[str, Any], + start_date: str = None, + end_date: str = None, + max_records: int = None, + max_threads: int = 8, + max_polls: int = 60, + poll_interval: int = 60, +): + """ + Poll a single job and process when complete. + + Args: + job_id: The job ID to poll + data_types_str: Comma-separated data types string + client_id: Client ID for authentication + client_secret: Client secret for authentication + bearer_token: Bearer token for authentication + token_expires_at: When current token expires + state: State dictionary + start_date: Start date of the job period (for resume logic) + end_date: End date of the job period (for resume logic) + max_records: Maximum records to process per file (None for no limit) + max_threads: Maximum number of threads for parallel processing + max_polls: Maximum number of polling attempts + poll_interval: Seconds between polls + + Returns: + Tuple of (updated_bearer_token, updated_expiration, job_id) + """ + log.info(f"Polling job {job_id} for: {data_types_str}") + + for poll_count in range(max_polls): + # Refresh token if needed before each poll + bearer_token, token_expires_at = refresh_token_if_needed( + client_id, client_secret, bearer_token, token_expires_at + ) + + log.info(f"Polling attempt {poll_count + 1}/{max_polls}") + + # Fetch jobs history + jobs_history = get_jobs_history(bearer_token) + + # Check job status + job_status = check_job_status(job_id, jobs_history=jobs_history) + + if not job_status["found"]: + log.warning(f"Job {job_id} not found in history yet") + time.sleep(poll_interval) + continue + + status = job_status["status"] + log.info(f"Job status: {status}") + + if status == "Completed": + download_endpoint = job_status.get("download_endpoint") + + if not download_endpoint: + log.error("Job completed but no download endpoint") + raise ValueError(f"No download endpoint for completed job {job_id}") + + # Download and process the completed job + log.info("Job completed, downloading and processing...") + handle_completed_job( + job_id, + data_types_str, + download_endpoint, + bearer_token, + state, + max_records, + max_threads, + ) + + log.info(f"Successfully processed job {job_id}") + return bearer_token, token_expires_at, job_id + + elif status == "Failed": + log.error(f"Job {job_id} failed") + raise ValueError(f"Export job failed: {job_id}") + + # Job still processing, wait before next poll + time.sleep(poll_interval) + + # Job didn't complete within timeout - store in state for resume + log.error( + f"Job {job_id} did not complete within timeout " + f"({max_polls * poll_interval / 60:.0f} minutes)" + ) + + # Store job info in state to resume on next sync + state["pending_job"] = { + "job_id": job_id, + "data_types": data_types_str, + "start_date": start_date, + "end_date": end_date, + "created_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), + } + + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # You should checkpoint even if you are not using incremental sync, as it tells Fivetran it is safe to write to destination. + # For large datasets, checkpoint regularly (e.g., every N records) not only at the end. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connector-sdk/best-practices#optimizingperformancewhenhandlinglargedatasets). + op.checkpoint(state=state) + + raise TimeoutError( + f"Job {job_id} timed out after " + f"{max_polls * poll_interval / 60:.0f} minutes. " + f"Stored in state for resume on next sync." + ) + + +def sync_with_last_n_hours( + client_id: str, + client_secret: str, + bearer_token: str, + token_expires_at: datetime, + data_type, # Can be str or list of str + last_n_hours: int, + email_recipients: list, + max_records: int, + max_threads: int, + max_polls: int, + state: Dict[str, Any], +) -> Tuple[str, datetime]: + """ + Sync using LastNHours strategy for recent data. + + Args: + client_id: Client ID for authentication + client_secret: Client secret for authentication + bearer_token: Bearer token for authentication + token_expires_at: When current token expires + data_type: Data type string or list of data types to export + last_n_hours: Number of hours to look back + email_recipients: List of email addresses for notifications + max_records: Maximum records to process (None for no limit) + max_threads: Maximum number of threads for parallel processing + max_polls: Maximum number of polling attempts + state: State dictionary to update + + Returns: + Tuple of (updated_bearer_token, updated_expiration) + """ + # Refresh token if needed + bearer_token, token_expires_at = refresh_token_if_needed( + client_id, client_secret, bearer_token, token_expires_at + ) + + # Normalize to list + data_types = [data_type] if isinstance(data_type, str) else data_type + + # Create job using LastNHours + job_response = create_job( + bearer_token=bearer_token, + last_n_hours=last_n_hours, + data_types=data_types, + email_recipients=email_recipients, + ) + job_id = job_response["job_id"] + log.info( + f"[{', '.join(data_types)}] Job created with ID: {job_id} " f"(LastNHours={last_n_hours})" + ) + + # Calculate approximate date range for state tracking + current_time = datetime.utcnow() + end_date_str = current_time.strftime("%Y-%m-%dT%H:%M:%S.000Z") + start_time = current_time - timedelta(hours=last_n_hours) + start_date_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + # Poll and process this job + result = poll_and_process_single_job( + job_id=job_id, + data_types_str=",".join(data_types), + client_id=client_id, + client_secret=client_secret, + bearer_token=bearer_token, + token_expires_at=token_expires_at, + state=state, + start_date=start_date_str, + end_date=end_date_str, + max_records=max_records, + max_threads=max_threads, + max_polls=max_polls, + poll_interval=60, + ) + bearer_token, token_expires_at, processed_job_id = result + + # Update state for all data types in this batch + sync_timestamp = end_date_str + + for dt in data_types: + update_data_type_state(state, dt, sync_timestamp) + + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # You should checkpoint even if you are not using incremental sync, as it tells Fivetran it is safe to write to destination. + # For large datasets, checkpoint regularly (e.g., every N records) not only at the end. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connector-sdk/best-practices#optimizingperformancewhenhandlinglargedatasets). + op.checkpoint(state=state) + + # Delete job after checkpoint + delete_job(processed_job_id, bearer_token) + + log.info(f"[{', '.join(data_types)}] Successfully synced using LastNHours") + + return bearer_token, token_expires_at + + +def sync_incremental_periods( + client_id: str, + client_secret: str, + bearer_token: str, + token_expires_at: datetime, + data_type, # Can be str or list of str + current_start: datetime, + increment_days: int, + email_recipients: list, + max_records: int, + max_threads: int, + max_polls: int, + state: Dict[str, Any], +) -> Tuple[str, datetime]: + """ + Sync using incremental date range strategy. + + Args: + client_id: Client ID for authentication + client_secret: Client secret for authentication + bearer_token: Bearer token for authentication + token_expires_at: When current token expires + data_type: Data type string or list of data types to export + current_start: Start datetime for sync + increment_days: Number of days per sync period + email_recipients: List of email addresses for notifications + max_records: Maximum records to process (None for no limit) + max_threads: Maximum number of threads for parallel processing + max_polls: Maximum number of polling attempts + state: State dictionary to update + + Returns: + Tuple of (updated_bearer_token, updated_expiration) + """ + # Normalize to list + data_types = [data_type] if isinstance(data_type, str) else data_type + data_types_str = ", ".join(data_types) + + current_time = datetime.utcnow() + + log.info( + f"[{data_types_str}] Starting from: " f"{current_start.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}" + ) + log.info(f"[{data_types_str}] Using {increment_days}-day sync periods") + + # Process data in configurable day increments + period_count = 0 + + while current_start < current_time: + # Refresh token if needed before each job + bearer_token, token_expires_at = refresh_token_if_needed( + client_id, client_secret, bearer_token, token_expires_at + ) + + # Calculate end time for this period + # End at 23:59:59 of the final day (subtract 1 to stay within range) + current_end = current_start + timedelta(days=increment_days - 1) + current_end = current_end.replace(hour=23, minute=59, second=59, microsecond=0) + + # Don't go beyond current time + if current_end > current_time: + current_end = current_time + + # Format dates for API + start_date_str = current_start.strftime("%Y-%m-%dT%H:%M:%S.000Z") + end_date_str = current_end.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + period_count += 1 + log.info( + f"[{data_types_str}] Processing {increment_days}-day " + f"period #{period_count}: {start_date_str} to {end_date_str}" + ) + + # Create job for this period + job_response = create_job( + bearer_token=bearer_token, + start_date=start_date_str, + end_date=end_date_str, + data_types=data_types, + email_recipients=email_recipients, + ) + job_id = job_response["job_id"] + log.info(f"[{data_types_str}] Job created with ID: {job_id}") + + # Poll and process this job + result = poll_and_process_single_job( + job_id=job_id, + data_types_str=",".join(data_types), + client_id=client_id, + client_secret=client_secret, + bearer_token=bearer_token, + token_expires_at=token_expires_at, + state=state, + start_date=start_date_str, + end_date=end_date_str, + max_records=max_records, + max_threads=max_threads, + max_polls=max_polls, + poll_interval=60, + ) + bearer_token, token_expires_at, processed_job_id = result + + # Update state for all data types in this batch + for dt in data_types: + update_data_type_state(state, dt, end_date_str) + + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # You should checkpoint even if you are not using incremental sync, as it tells Fivetran it is safe to write to destination. + # For large datasets, checkpoint regularly (e.g., every N records) not only at the end. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connector-sdk/best-practices#optimizingperformancewhenhandlinglargedatasets). + op.checkpoint(state=state) + + # Delete job after checkpoint + delete_job(processed_job_id, bearer_token) + + log.info(f"[{data_types_str}] Successfully synced period ending: " f"{end_date_str}") + + # Move to next period (start from next second after end) + current_start = current_end + timedelta(seconds=1) + + log.info( + f"[{data_types_str}] Incremental sync completed. " + f"Processed {period_count} {increment_days}-day period(s)" + ) + + return bearer_token, token_expires_at