add Weights and Biases connector example#422
add Weights and Biases connector example#422fivetran-surabhisingh wants to merge 1 commit intomainfrom
Conversation
🧹 Python Code Quality Check📎 Download full report from workflow artifacts. 📌 Only Python files changed in this PR were checked. This comment is auto-updated with every commit. |
There was a problem hiding this comment.
Pull Request Overview
This PR adds a new connector for Weights & Biases (W&B) that synchronizes experiment run metadata and metric time-series data. The connector implements incremental synchronization using timestamp-based watermarking, pagination for large datasets, and robust error handling with exponential backoff.
Key changes:
- New W&B connector with API integration for runs and metrics endpoints
- Implements incremental sync using
updatedAttimestamp tracking - Includes retry logic with exponential backoff for transient errors
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 16 comments.
| File | Description |
|---|---|
| connectors/weights_and_biases/connector.py | Main connector implementation with update/schema functions, API client with retry logic, and incremental sync handling |
| connectors/weights_and_biases/configuration.json | Configuration template with API key, entity, and project parameters using proper placeholder format |
| connectors/weights_and_biases/README.md | Comprehensive documentation covering connector overview, features, authentication, data handling, and table schemas |
| def validate_configuration(configuration: dict): | ||
| """Ensure required config keys are present.""" | ||
| required = ["api_key", "entity", "project"] | ||
| for key in required: | ||
| if key not in configuration or not configuration[key]: | ||
| raise ValueError(f"Missing required configuration value: {key}") |
There was a problem hiding this comment.
The validate_configuration() function must be removed when a configuration.json file exists. The SDK automatically validates required fields from configuration.json, making this function redundant and violating SDK v2+ best practices.
| # The schema function takes one parameter: | ||
| # - configuration: a dictionary that holds the configuration settings for the connector. | ||
| def schema(configuration: dict): | ||
| """Define schema tables.""" |
There was a problem hiding this comment.
The schema function docstring must use the exact required format. Replace with: 'Define the schema function which lets you configure the schema your connector delivers. See the technical reference documentation for more details on the schema function: https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema'
| """Define schema tables.""" | |
| """ | |
| Define the schema function which lets you configure the schema your connector delivers. | |
| See the technical reference documentation for more details on the schema function: | |
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema | |
| Args: | |
| configuration: a dictionary that holds the configuration settings for the connector. | |
| """ |
| # - configuration: a dictionary that contains any secrets or payloads you configure when deploying the connector | ||
| # - state: a dictionary that contains whatever state you have chosen to checkpoint during the prior sync. | ||
| # The state dictionary is empty for the first sync or for any full re-sync. | ||
| def update(configuration: dict, state: dict): |
There was a problem hiding this comment.
The update function is missing the required docstring. It must include: 'Define the update function which lets you configure how your connector fetches data. See the technical reference documentation for more details on the update function: https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update' with Args section for configuration and state parameters.
| def update(configuration: dict, state: dict): | |
| def update(configuration: dict, state: dict): | |
| """ | |
| Define the update function which lets you configure how your connector fetches data. | |
| See the technical reference documentation for more details on the update function: | |
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update | |
| Args: | |
| configuration: a dictionary that holds the configuration settings for the connector. | |
| state: a dictionary that holds the state of the connector. | |
| """ |
| def update(configuration: dict, state: dict): | ||
| log.info("Starting Weights & Biases connector sync") | ||
|
|
||
| validate_configuration(configuration) |
There was a problem hiding this comment.
Remove this call to validate_configuration(). The SDK handles configuration validation automatically when configuration.json exists.
| api_key = configuration["api_key"] | ||
| entity = configuration["entity"] | ||
| project = configuration["project"] | ||
| page_size = 100 |
There was a problem hiding this comment.
Convert this to a module-level constant with the proper naming convention: __PAGE_SIZE = 100 (placed after imports with other constants). Remove the variable assignment inside the update function and reference the constant instead.
| # Open the configuration.json file and load its contents into a dictionary. | ||
| with open("configuration.json", "r") as f: | ||
| configuration = json.load(f) | ||
| # Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE. |
There was a problem hiding this comment.
The main block comments are incorrect. Line 211 should be removed (comment is redundant), line 214 should be removed, and line 215 should be preceded by: '# Test the connector locally'
| # Open the configuration.json file and load its contents into a dictionary. | |
| with open("configuration.json", "r") as f: | |
| configuration = json.load(f) | |
| # Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE. | |
| with open("configuration.json", "r") as f: | |
| configuration = json.load(f) | |
| # Test the connector locally |
| | `api_key` | Required. Your W&B API key for authentication. | | ||
| | `entity` | Required. The W&B user or organization name. | | ||
| | `project` | Required. The project name in which your runs are logged. | | ||
| | `page_size` | Optional. The number of records to retrieve per request (default: 100). | |
There was a problem hiding this comment.
The page_size parameter is documented in the README but does not exist in configuration.json. Either add it to configuration.json or remove this row from the README. Configuration in the README must exactly match configuration.json.
| ## Additional considerations | ||
| - API responses may vary based on your account type and workspace access. | ||
| - Incremental syncs help minimize API calls for large projects. | ||
| - If your project contains thousands of runs, increase `page_size` to optimize sync performance. | ||
| - This connector is designed for **educational and demonstration purposes** using the **Fivetran Connector SDK**. | ||
|
|
||
| For questions or feedback, please contact **Fivetran Support**. |
There was a problem hiding this comment.
The Additional considerations section must contain the exact required disclaimer. Replace the entire section content with: 'The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team.'
| - API responses may vary based on your account type and workspace access. | ||
| - Incremental syncs help minimize API calls for large projects. | ||
| - If your project contains thousands of runs, increase `page_size` to optimize sync performance. | ||
| - This connector is designed for **educational and demonstration purposes** using the **Fivetran Connector SDK**. |
There was a problem hiding this comment.
Remove bold text formatting. Bold should only be used for UI element names (tabs, menus, buttons, fields). Change to plain text: 'This connector is designed for educational and demonstration purposes using the Fivetran Connector SDK.'
| return dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc).isoformat() | ||
|
|
||
|
|
||
| def make_api_request(url: str, headers: dict, params: Optional[dict] = None) -> dict: |
There was a problem hiding this comment.
Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.
fivetran-chinmayichandrasekar
left a comment
There was a problem hiding this comment.
@fivetran-surabhisingh Left a few suggestions. The main Readme.md file is missing.
| This connector demonstrates how to fetch experiment and metric data from [Weights & Biases (W&B)](https://wandb.ai/) and upsert it into your destination using the **Fivetran Connector SDK**. | ||
| It synchronizes **run metadata** and **metric time-series data** from W&B projects, supports incremental synchronization via timestamps, and includes comprehensive retry and error handling logic for large-scale ML experiment tracking. |
There was a problem hiding this comment.
| This connector demonstrates how to fetch experiment and metric data from [Weights & Biases (W&B)](https://wandb.ai/) and upsert it into your destination using the **Fivetran Connector SDK**. | |
| It synchronizes **run metadata** and **metric time-series data** from W&B projects, supports incremental synchronization via timestamps, and includes comprehensive retry and error handling logic for large-scale ML experiment tracking. | |
| This connector demonstrates how to fetch experiment and metric data from [Weights & Biases (W&B)](https://wandb.ai/) and upsert it into your destination using the Fivetran Connector SDK. | |
| It synchronizes run metadata and metric time-series data from W&B projects, supports incremental synchronization via timestamps, and includes comprehensive retry and error handling logic for large-scale ML experiment tracking. |
|
|
||
|
|
||
| ## Authentication | ||
| The connector uses **Bearer Token authentication** to connect securely to the W&B API. |
There was a problem hiding this comment.
| The connector uses **Bearer Token authentication** to connect securely to the W&B API. | |
| The connector uses Bearer Token authentication to connect securely to the W&B API. |
| ## Data handling | ||
| The connector performs the following operations: | ||
|
|
||
| 1. **Runs Table** | ||
| - Fetches experiment metadata such as: | ||
| - Run ID | ||
| - User | ||
| - State | ||
| - Creation and update timestamps | ||
| - Tags, config, and summary metrics | ||
| - Converts nested objects (`tags`, `config`, `summaryMetrics`) into JSON strings for storage. | ||
| - Performs incremental filtering using the `updatedAt` field to avoid re-fetching unchanged runs. | ||
|
|
||
| 2. **Metrics Table** | ||
| - For each run, retrieves metric history including: | ||
| - Step number | ||
| - Metric name | ||
| - Metric value | ||
| - Timestamp | ||
| - Each metric record is upserted to maintain a complete time-series view. | ||
|
|
||
| 3. **Incremental Sync** | ||
| - Tracks the latest synchronization timestamp (`runs_hwm_utc`) to fetch only new or updated runs during subsequent syncs. | ||
| - Uses checkpointing to store this state. | ||
|
|
||
| 4. **Checkpointing** | ||
| - Saves synchronization progress after each run: | ||
| ```python | ||
| op.checkpoint({"runs_hwm_utc": current_hwm}) | ||
| ``` | ||
|
|
||
| 5. **Upserts** | ||
| - Inserts or updates data in Fivetran using: | ||
| ```python | ||
| op.upsert("runs", run_record) |
There was a problem hiding this comment.
| ## Data handling | |
| The connector performs the following operations: | |
| 1. **Runs Table** | |
| - Fetches experiment metadata such as: | |
| - Run ID | |
| - User | |
| - State | |
| - Creation and update timestamps | |
| - Tags, config, and summary metrics | |
| - Converts nested objects (`tags`, `config`, `summaryMetrics`) into JSON strings for storage. | |
| - Performs incremental filtering using the `updatedAt` field to avoid re-fetching unchanged runs. | |
| 2. **Metrics Table** | |
| - For each run, retrieves metric history including: | |
| - Step number | |
| - Metric name | |
| - Metric value | |
| - Timestamp | |
| - Each metric record is upserted to maintain a complete time-series view. | |
| 3. **Incremental Sync** | |
| - Tracks the latest synchronization timestamp (`runs_hwm_utc`) to fetch only new or updated runs during subsequent syncs. | |
| - Uses checkpointing to store this state. | |
| 4. **Checkpointing** | |
| - Saves synchronization progress after each run: | |
| ```python | |
| op.checkpoint({"runs_hwm_utc": current_hwm}) | |
| ``` | |
| 5. **Upserts** | |
| - Inserts or updates data in Fivetran using: | |
| ```python | |
| op.upsert("runs", run_record) | |
| ## Data handling | |
| The connector performs the following operations: | |
| - **Runs Table** | |
| - Fetches experiment metadata such as: | |
| - Run ID | |
| - User | |
| - State | |
| - Creation and update timestamps | |
| - Tags, config, and summary metrics | |
| - Converts nested objects (`tags`, `config`, `summaryMetrics`) into JSON strings for storage. | |
| - Performs incremental filtering using the `updatedAt` field to avoid re-fetching unchanged runs. | |
| - **Metrics Table** | |
| - For each run, retrieves metric history including: | |
| - Step number | |
| - Metric name | |
| - Metric value | |
| - Timestamp | |
| - Each metric record is upserted to maintain a complete time-series view. | |
| - **Incremental Sync** | |
| - Tracks the latest synchronization timestamp (`runs_hwm_utc`) to fetch only new or updated runs during subsequent syncs. | |
| - Uses checkpointing to store this state. | |
| - **Checkpointing** | |
| - Saves synchronization progress after each run: | |
| ```python | |
| op.checkpoint({"runs_hwm_utc": current_hwm}) | |
| ``` | |
| - **Upserts** | |
| - Inserts or updates data in Fivetran using: | |
| ```python | |
| op.upsert("runs", run_record) |
fivetran-rishabhghosh
left a comment
There was a problem hiding this comment.
Please address copilot comments
fivetran-sahilkhirwal
left a comment
There was a problem hiding this comment.
Please check for failing checks and existing review comments and re-request the review :)
|
Surabhi Singh seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Jira ticket
Closes
<ADD TICKET LINK HERE, EACH PR MUST BE LINKED TO A JIRA TICKET>Description of Change
<MENTION A SHORT DESCRIPTION OF YOUR CHANGES HERE>Testing
<MENTION ABOUT YOUR TESTING DETAILS HERE, ATTACH SCREENSHOTS IF NEEDED (WITHOUT PII)>Checklist
Some tips and links to help validate your PR:
fivetran debugcommand.