Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ For SDK installation and setup, visit the main [Fivetran Connector SDK repositor
- **[amazon_video_central](https://github.com/fivetran/community_connectors/tree/main/amazon_video_central)** - Sync report data from Amazon Video Central API
- **[awardco](https://github.com/fivetran/community_connectors/tree/main/awardco)** - Sync data from Awardco rewards platform
- **[betterstack](https://github.com/fivetran/community_connectors/tree/main/betterstack)** - Sync uptime monitoring data from Better Stack
- **[callminer](https://github.com/fivetran/fivetran_csdk_connectors/tree/main/callminer)** - Sync CallMiner Bulk Export data using OAuth2 authentication, export job polling, archive extraction, and per-data-type incremental state tracking
- **[checkly](https://github.com/fivetran/community_connectors/tree/main/checkly)** - Sync monitoring check data and analytics from Checkly
- **[clerk](https://github.com/fivetran/community_connectors/tree/main/clerk)** - Sync user data from Clerk authentication
- **[commonpaper](https://github.com/fivetran/community_connectors/tree/main/commonpaper)** - Sync agreement data from Common Paper
Expand Down
153 changes: 153 additions & 0 deletions callminer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# CallMiner Connector Example

## Connector overview

The CallMiner connector for Fivetran uses the CallMiner Bulk Export API to create export jobs, poll job status, download completed export archives, extract nested compressed CSV files, and sync the exported records to your destination.

## Requirements

- [Supported Python versions](https://github.com/fivetran/fivetran_csdk_connectors/blob/main/README.md#requirements)
- Operating system:
- Windows: 10 or later (64-bit only)
- macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64])
- Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64)

## Getting started

Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started.

To initialize a new Connector SDK project using this connector as a starting point, run:

```bash
fivetran init --template callminer
```

`fivetran init` initializes a new Connector SDK project by setting up the project structure, configuration files, and a connector you can run immediately with `fivetran debug`. For more information on `fivetran init`, refer to the [Connector SDK `init` documentation](https://fivetran.com/docs/connector-sdk/connector-development-and-configuration/connector-sdk-commands#fivetraninit).

> Note: Ensure you have updated the `configuration.json` file with the necessary parameters before running `fivetran debug`. See the [Configuration file](#configuration-file) section for details on the required configuration parameters.

## Features

- OAuth2 client credentials authentication with automatic token refresh.
- Bulk export job creation, polling, download, and cleanup.
- Incremental sync windows using a configurable `initial_start_date` and `increment_days`.
- Recent sync optimization using CallMiner's `LastNHours` export option when state is close to current time.
- Per-data-type state tracking through the `data_types` state object.
- Pending job recovery through the `pending_job` state object when an export does not finish within the configured polling window.
- Nested archive handling for outer ZIP files with inner `.gz` or `.zip` CSV payloads.
- Parallel nested archive extraction with a configurable `max_threads` value. Fivetran SDK operations run on the main thread.
- Optional `max_records` and `test_job_id` settings for local testing.

## Configuration file

The connector requires the following configuration parameters in the `configuration.json` file. This configuration is uploaded to Fivetran and defines how the connector authenticates with CallMiner and selects Bulk Export data.

```json
{
"client_id": "<YOUR_CLIENT_ID>",
"client_secret": "<YOUR_CLIENT_SECRET>",
"initial_start_date": "<YOUR_INITIAL_START_DATE>",
"increment_days": "<YOUR_INCREMENT_DAYS>",
"max_threads": "<YOUR_MAX_THREADS>",
"max_polls": "<YOUR_MAX_POLLS>",
"email_recipients": "<YOUR_EMAIL_RECIPIENTS>",
"data_types": "<YOUR_DATA_TYPES>"
}
```
Comment thread
fivetran-clgritton marked this conversation as resolved.

Required parameters:

- `client_id`: CallMiner OAuth2 client ID.
- `client_secret`: CallMiner OAuth2 client secret.
- `initial_start_date`: Starting timestamp for the first sync, formatted as `YYYY-MM-DDTHH:MM:SS.000Z`.
- `data_types`: Comma-separated list of CallMiner Bulk Export data types to request.

Optional parameters:

- `increment_days`: Number of days per initial or catch-up sync period. The default is `10`.
- `email_recipients`: Comma-separated email addresses for CallMiner export job notifications.
- `max_records`: Maximum records to process per file for local testing. Omit this value for full syncs.
- `max_threads`: Maximum number of parallel archive extraction threads. The default is `8`; valid values are `1` through `16`.
- `max_polls`: Maximum number of job polling attempts. The default is `60`, with one poll per minute in normal sync mode.
- `test_job_id`: Existing CallMiner export job ID for local testing. When set, the connector skips job creation and processes the specified job.

> Note: When submitting connector code as a [Community Connector](https://github.com/fivetran/fivetran_csdk_connectors/tree/main) in the open-source [Connector SDK repository](https://github.com/fivetran/fivetran_csdk_connectors/tree/main), ensure the `configuration.json` file has placeholder values. When adding the connector to your production repository, ensure that the `configuration.json` file is not checked into version control to protect sensitive information.

## Requirements file

This connector does not require a `requirements.txt` file. It uses Python standard libraries, the Fivetran Connector SDK, and the `requests` library, which are available in the Connector SDK runtime environment.

> Note: [Some packages](https://fivetran.com/docs/connector-sdk/technical-reference#preinstalledpackages) are pre-installed in the Connector SDK runtime environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`.

## Authentication

The connector uses OAuth2 client credentials flow through the CallMiner identity provider. The `get_access_token` function posts the configured `client_id` and `client_secret` to the token endpoint and stores the returned bearer token for API calls.
Comment thread
fivetran-clgritton marked this conversation as resolved.

Before you can request a token from CallMiner's Identity Provider Service, you'll need to request a Client ID and Secret from `support@callminer.com`.

Tokens are refreshed before expiration by `refresh_token_if_needed`, using a five-minute buffer to avoid using an expired token during long-running export jobs.

## Bulk export workflow

The connector syncs CallMiner data through Bulk Export jobs rather than direct row pagination.

1. The `update` function validates and parses configuration.
2. The connector determines whether each configured data type should use an incremental date window or the `LastNHours` option.
3. Data types with the same sync strategy are grouped into one Bulk Export job.
4. The connector polls job history until the job is completed, failed, or times out.
5. Completed jobs are downloaded, extracted, and processed into destination tables.
6. The connector checkpoints state after each completed sync period.
7. Completed jobs are deleted after checkpointing.

If a job times out, the connector stores the job details in `pending_job` and checkpoints state before raising an error. The next sync resumes polling the same job before continuing normal sync work.

## Data handling

The connector downloads each completed CallMiner export as an outer ZIP file. The outer ZIP can contain metadata JSON and one or more nested `.gz` or `.zip` files with CSV data.

CSV files are processed as streams with `csv.DictReader`. Table names are derived from exported filenames by removing UUID prefixes and file extensions, then normalizing names to lowercase with underscores. Worker threads decompress nested files into temporary CSV files without calling Fivetran SDK operations. The main thread reads those temporary CSV files and delivers records with `op.upsert`.

Nested files are extracted in parallel with `ThreadPoolExecutor`. Files are sorted by size before extraction so larger files start earlier. Materialized CSV files are then upserted on the main thread in archive order and removed after processing.

## Error handling

The connector implements targeted error handling across authentication, API requests, job polling, and file processing.

- API retries: The `retry_on_500_error` decorator retries HTTP 500-level errors with exponential backoff.
- Request errors: API request failures are logged and raised with the original `requests` exception.
- Job failures: Failed export jobs raise `ValueError` and are not deleted, allowing manual inspection in CallMiner.
- Job timeouts: Timed-out jobs are saved in state and resumed on the next sync.
- File processing errors: Gzip, ZIP, CSV, encoding, and unexpected file errors are tracked and logged with error statistics.

## Tables created

The connector defines primary keys for known CallMiner export tables in the `schema` function. Columns are inferred from the CSV headers returned by CallMiner.

| Table name | Primary key |
Comment thread
fivetran-clgritton marked this conversation as resolved.
| ---------- | ----------- |
| `AI-SUMMARIES` | `contact_id` |
| `COMMENTS` | `comment_id` |
| `CONTACTS` | `id` |
| `CATEGORIES` | `contact_id`, `category_id`, `section_id` |
| `CATEGORY-COMPONENTS` | `contact_id`, `category_id`, `component_id`, `start_time` |
| `EVENTS-DELAY` | `contact_id`, `start_time`, `end_time` |
| `EVENTS-OVERTALK` | `contact_id`, `start_time`, `end_time` |
| `EVENTS-REDACTION` | `contact_id`, `start_time`, `end_time` |
| `EVENTS-SILENCE` | `contact_id`, `start_time`, `end_time` |
| `SCORES` | `contact_id`, `score_id` |
| `SCORE-INDICATORS` | `contact_id`, `score_id`, `score_component_id` |
| `TAGS` | `contact_id`, `tag_id` |
| `TRANSCRIPTS` | `contact_id`, `start_time` |

Comment thread
fivetran-clgritton marked this conversation as resolved.
## Additional files

- `auth.py`: Handles OAuth2 token requests, token refresh, and retry behavior for 500-level API errors.
- `api_client.py`: Creates export jobs, retrieves job history, checks job status, and deletes completed jobs.
- `config.py`: Validates and parses connector configuration values.
- `file_processing.py`: Downloads export files, extracts nested archives, processes CSV streams, and tracks file processing errors.
- `state.py`: Reads and updates per-data-type sync state.
- `sync.py`: Determines sync strategy, orchestrates export jobs, polls job status, checkpoints progress, and resumes pending jobs.

## Additional considerations

The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team.
205 changes: 205 additions & 0 deletions callminer/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""
CallMiner API client functions for job management.
"""

import requests
from typing import Dict, Any
from fivetran_connector_sdk import Logging as log
from auth import retry_on_500_error


@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2)
def create_job(
bearer_token: str,
start_date: str = None,
end_date: str = None,
last_n_hours: int = None,
data_types: list = None,
email_recipients: list = None,
) -> Dict[str, Any]:
"""
Create a new export job for contacts.

Args:
bearer_token: Bearer token for authentication
start_date: Start date in ISO format (e.g., "2025-09-01T00:00:00.000Z")
end_date: End date in ISO format (e.g., "2025-10-01T00:00:00.000Z")
last_n_hours: Number of hours to look back (alternative to date range)
data_types: List of data types to export (default: ["Contacts"])
email_recipients: List of email addresses for notifications

Returns:
Dictionary containing:
- job_id: The job ID
- create_date: The job creation date
- full_response: Complete API response
"""
url = "https://api.callminer.net/bulkexport/api/export/job"

headers = {"Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json"}

if data_types is None:
data_types = ["Contacts"]

if email_recipients is None:
email_recipients = []

# Generate descriptive name based on data types
if len(data_types) == 1:
job_name = f"{data_types[0]} Export"
else:
job_name = f"Bulk Export ({len(data_types)} data types)"

# Build duration based on whether we're using LastNHours or custom date range
if last_n_hours is not None:
# Use LastNHours for subsequent syncs
duration = {
"LastNDays": None,
"LastNHours": last_n_hours,
"TimeFrame": None,
"StartDate": None,
"EndDate": None,
"SearchMode": "NewAndUpdated",
}
log.info(f"Creating job for last {last_n_hours} hours")
else:
# Use custom date range for initial sync
duration = {
"LastNDays": None,
"LastNHours": None,
"TimeFrame": "Custom",
"StartDate": start_date,
"EndDate": end_date,
"SearchMode": "NewAndUpdated",
}
log.info(f"Creating job for date range: {start_date} to {end_date}")

payload = {
"Name": job_name,
"Duration": duration,
"DataTypes": data_types,
"SearchFilters": [],
"NotificationMethod": "Email",
"EmailRecipients": email_recipients,
}

try:
response = requests.post(url, headers=headers, json=payload, timeout=(30, 60))
response.raise_for_status()

job_data = response.json()
job_id = job_data.get("Id")
create_date = job_data.get("CreateDate")

if not job_id:
log.error(f"No job ID in response: {job_data}")
raise ValueError("Failed to get job ID from response")

log.info(f"Successfully created job with ID: {job_id}")

return {"job_id": job_id, "create_date": create_date, "full_response": job_data}

except requests.exceptions.RequestException as e:
log.error(f"Error creating job: {e}")
raise


@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2)
def get_jobs_history(bearer_token: str) -> list:
"""
Get the history of all jobs.

Args:
bearer_token: Bearer token for authentication

Returns:
List of job dictionaries containing job history
"""
url = "https://api.callminer.net/bulkexport/api/export/history"

headers = {"Authorization": f"Bearer {bearer_token}"}

try:
response = requests.get(url, headers=headers, timeout=(30, 60))
response.raise_for_status()

jobs = response.json()

# If not a list, assume it's wrapped in an object
if not isinstance(jobs, list):
jobs = []

log.info(f"Retrieved {len(jobs)} jobs from history")

return jobs

except requests.exceptions.RequestException as e:
log.error(f"Error getting jobs history: {e}")
raise


@retry_on_500_error(max_retries=3, initial_delay=1, backoff_factor=2)
def delete_job(job_id: str, bearer_token: str) -> None:
"""
Delete an export job.

Args:
job_id: The job ID to delete
bearer_token: Bearer token for authentication
"""
url = f"https://api.callminer.net/bulkexport/api/export/job/{job_id}"

headers = {"Authorization": f"Bearer {bearer_token}"}

log.info(f"Deleting job: {job_id}")

try:
response = requests.delete(url, headers=headers, timeout=(30, 60))
response.raise_for_status()

log.info(f"Successfully deleted job: {job_id}")

except requests.exceptions.RequestException as e:
log.error(f"Error deleting job {job_id}: {e}")
# Don't raise - deletion failure shouldn't fail the sync
# Job will eventually expire on CallMiner's side


def check_job_status(
job_id: str, bearer_token: str = None, jobs_history: list = None
) -> Dict[str, Any]:
"""
Check the status of a job by searching the jobs history.

Args:
job_id: The ID of the job to check (matches ExportJobId in history)
bearer_token: Bearer token for authentication
(optional if jobs_history provided)
jobs_history: Pre-fetched jobs history (optional, fetches if not provided)

Returns:
Dictionary containing job status information with keys:
- status: Job status (e.g., "Completed", "Processing", "Failed")
- download_endpoint: Download endpoint if job is completed (optional)
- found: Boolean indicating if job was found in history
"""
if jobs_history is None:
if bearer_token is None:
raise ValueError("Either jobs_history or bearer_token must be provided")
jobs_history = get_jobs_history(bearer_token)

for job in jobs_history:
export_job_id = job.get("ExportJobId")

if export_job_id == job_id:
status = job.get("Status")
download_endpoint = job.get("DownloadEndpoint")

result = {"status": status, "found": True, "job_data": job}

if download_endpoint:
result["download_endpoint"] = download_endpoint

return result

return {"status": None, "found": False, "job_data": None}
Loading
Loading