add redshift_multithreading connector example#440
add redshift_multithreading connector example#440fivetran-surabhisingh wants to merge 1 commit intomainfrom
Conversation
🧹 Python Code Quality Check📎 Download full report from workflow artifacts. 📌 Only Python files changed in this PR were checked. This comment is auto-updated with every commit. |
|
We already have a ready to use example present in the examples repo here: |
There was a problem hiding this comment.
Pull Request Overview
This PR attempts to add a Redshift multithreading connector example, but the implementation uses an incompatible API pattern and is missing critical required files and functionality.
Key issues:
- Uses a decorator-based API (
@connector,config.Config(),schema.Schema()) that does not exist in the Fivetran Connector SDK - Missing all required files:
configuration.json,README.md, andrequirements.txt - Violates SDK multithreading guidelines by calling SDK operations from worker threads
| import threading | ||
| import psycopg2 | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema | ||
|
|
||
| CONFIG = config.Config( | ||
| host=config.StringField(), | ||
| port=config.IntegerField(default=5439), | ||
| database=config.StringField(), | ||
| user=config.StringField(), | ||
| password=config.SecretField(), | ||
| threads=config.IntegerField(default=4) | ||
| ) | ||
|
|
||
| SCHEMA = schema.Schema( | ||
| name="redshift_table", | ||
| columns={ | ||
| "id": schema.StringColumn(), | ||
| "data": schema.JSONColumn(), | ||
| } | ||
| ) | ||
|
|
||
| @connector( | ||
| name="RedshiftMultithreadingConnector", | ||
| version="0.1.0", | ||
| config=CONFIG, | ||
| schema=SCHEMA, | ||
| ) | ||
| def run_connector(ctx: state.Context): | ||
| def worker(offset): | ||
| conn = psycopg2.connect( | ||
| host=ctx.config.host, | ||
| dbname=ctx.config.database, | ||
| user=ctx.config.user, | ||
| password=ctx.config.password, | ||
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): | ||
| records.write("redshift_table", {"id": row[0], "data": row}) | ||
| conn.close() | ||
|
|
||
| threads = [] | ||
| for i in range(ctx.config.threads): | ||
| t = threading.Thread(target=worker, args=(i * 100,)) | ||
| t.start() | ||
| threads.append(t) | ||
|
|
||
| for t in threads: | ||
| t.join() | ||
|
|
||
| return ctx.update_state({"last_sync": "now"}) |
There was a problem hiding this comment.
This connector uses an incorrect API pattern for the Fivetran Connector SDK. The SDK does not support decorator-based connectors with @connector, config.Config(), or schema.Schema() patterns.
The correct pattern requires:
- Import:
from fivetran_connector_sdk import Connector, Operations as op, Logging as log - Define
update(configuration: dict, state: dict)function - Define
schema(configuration: dict)function - Initialize:
connector = Connector(update=update, schema=schema)
Please refer to the template at template_example_connector/connector.py or existing examples like connectors/redshift/simple_redshift_connector/connector.py for the correct structure.
| import threading | |
| import psycopg2 | |
| from fivetran_connector_sdk import connector, config, state, records, log, schema | |
| CONFIG = config.Config( | |
| host=config.StringField(), | |
| port=config.IntegerField(default=5439), | |
| database=config.StringField(), | |
| user=config.StringField(), | |
| password=config.SecretField(), | |
| threads=config.IntegerField(default=4) | |
| ) | |
| SCHEMA = schema.Schema( | |
| name="redshift_table", | |
| columns={ | |
| "id": schema.StringColumn(), | |
| "data": schema.JSONColumn(), | |
| } | |
| ) | |
| @connector( | |
| name="RedshiftMultithreadingConnector", | |
| version="0.1.0", | |
| config=CONFIG, | |
| schema=SCHEMA, | |
| ) | |
| def run_connector(ctx: state.Context): | |
| def worker(offset): | |
| conn = psycopg2.connect( | |
| host=ctx.config.host, | |
| dbname=ctx.config.database, | |
| user=ctx.config.user, | |
| password=ctx.config.password, | |
| port=ctx.config.port, | |
| ) | |
| cur = conn.cursor() | |
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | |
| for row in cur.fetchall(): | |
| records.write("redshift_table", {"id": row[0], "data": row}) | |
| conn.close() | |
| threads = [] | |
| for i in range(ctx.config.threads): | |
| t = threading.Thread(target=worker, args=(i * 100,)) | |
| t.start() | |
| threads.append(t) | |
| for t in threads: | |
| t.join() | |
| return ctx.update_state({"last_sync": "now"}) | |
| import threading # For multithreaded data extraction | |
| import psycopg2 # For connecting to Amazon Redshift | |
| import json # For reading configuration from JSON file | |
| from fivetran_connector_sdk import Connector # For connector initialization | |
| from fivetran_connector_sdk import Logging as log # For logging | |
| from fivetran_connector_sdk import Operations as op # For upsert and checkpoint operations | |
| # Constants for configuration keys and batch size | |
| __BATCH_SIZE = 100 # Number of records per thread | |
| __TABLE_NAME = "redshift_table" | |
| __MAX_THREADS = 16 # Maximum allowed threads | |
| def validate_configuration(configuration: dict): | |
| """ | |
| Validate the configuration dictionary to ensure it contains all required parameters. | |
| This function is called at the start of the update method to ensure that the connector has all necessary configuration values. | |
| Args: | |
| configuration: a dictionary that holds the configuration settings for the connector. | |
| Raises: | |
| ValueError: if any required configuration parameter is missing or invalid. | |
| """ | |
| required_configs = ["host", "port", "database", "user", "password", "threads"] | |
| for key in required_configs: | |
| if key not in configuration: | |
| raise ValueError(f"Missing required configuration value: {key}") | |
| if not isinstance(configuration["port"], int) or not (0 < configuration["port"] < 65536): | |
| raise ValueError("Port must be a valid integer between 1 and 65535.") | |
| if not isinstance(configuration["threads"], int) or not (1 <= configuration["threads"] <= __MAX_THREADS): | |
| raise ValueError(f"Threads must be an integer between 1 and {__MAX_THREADS}.") | |
| # Additional validation can be added here as needed | |
| def schema(configuration: dict): | |
| """ | |
| Define the schema function which lets you configure the schema your connector delivers. | |
| See the technical reference documentation for more details on the schema function: | |
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema | |
| Args: | |
| configuration: a dictionary that holds the configuration settings for the connector. | |
| """ | |
| return [ | |
| { | |
| "table": __TABLE_NAME, | |
| "primary_key": ["id"], | |
| "columns": { | |
| "id": "STRING", | |
| "data": "JSON" | |
| } | |
| } | |
| ] | |
| def update(configuration: dict, state: dict): | |
| """ | |
| Define the update function which lets you configure how your connector fetches data. | |
| See the technical reference documentation for more details on the update function: | |
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update | |
| Args: | |
| configuration: a dictionary that holds the configuration settings for the connector. | |
| state: a dictionary that holds the state of the connector. | |
| """ | |
| log.warning("Example: DATABASE : Redshift Multithreading") | |
| validate_configuration(configuration) | |
| # Use threading to fetch data in parallel from Redshift | |
| threads = [] | |
| thread_errors = [] | |
| results_lock = threading.Lock() | |
| def worker(offset): | |
| """ | |
| Worker function to fetch a batch of records from Redshift. | |
| Args: | |
| offset: The offset for the SQL query LIMIT/OFFSET. | |
| """ | |
| try: | |
| conn = psycopg2.connect( | |
| host=configuration["host"], | |
| dbname=configuration["database"], | |
| user=configuration["user"], | |
| password=configuration["password"], | |
| port=configuration["port"], | |
| ) | |
| cur = conn.cursor() | |
| # Fetch a batch of records using LIMIT and OFFSET | |
| cur.execute(f"SELECT * FROM some_table LIMIT {__BATCH_SIZE} OFFSET {offset}") | |
| rows = cur.fetchall() | |
| for row in rows: | |
| # The 'upsert' operation is used to insert or update data in the destination table. | |
| # The first argument is the name of the destination table. | |
| # The second argument is a dictionary containing the record to be upserted. | |
| op.upsert( | |
| table=__TABLE_NAME, | |
| data={"id": str(row[0]), "data": row} | |
| ) | |
| conn.close() | |
| except Exception as e: | |
| with results_lock: | |
| thread_errors.append(str(e)) | |
| log.severe(f"Thread failed with error: {e}") | |
| num_threads = configuration["threads"] | |
| for i in range(num_threads): | |
| t = threading.Thread(target=worker, args=(i * __BATCH_SIZE,)) | |
| t.start() | |
| threads.append(t) | |
| for t in threads: | |
| t.join() | |
| if thread_errors: | |
| raise RuntimeError(f"One or more threads failed: {thread_errors}") | |
| # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume | |
| # from the correct position in case of next sync or interruptions. | |
| # Learn more about how and where to checkpoint by reading our best practices documentation | |
| # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). | |
| state["last_sync"] = "now" | |
| op.checkpoint(state) | |
| # Create the connector object using the schema and update functions | |
| connector = Connector(update=update, schema=schema) | |
| # Check if the script is being run as the main module. | |
| # This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. | |
| # This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. | |
| # Please test using the Fivetran debug command prior to finalizing and deploying your connector. | |
| if __name__ == "__main__": | |
| # Open the configuration.json file and load its contents | |
| with open("configuration.json", "r") as f: | |
| configuration = json.load(f) | |
| # Test the connector locally | |
| connector.debug() |
| conn = psycopg2.connect( | ||
| host=ctx.config.host, | ||
| dbname=ctx.config.database, | ||
| user=ctx.config.user, | ||
| password=ctx.config.password, | ||
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): | ||
| records.write("redshift_table", {"id": row[0], "data": row}) | ||
| conn.close() |
There was a problem hiding this comment.
Missing error handling and retry logic. Database queries can fail due to transient network issues, connection timeouts, or temporary database unavailability. There is no try-except block or retry mechanism with exponential backoff.
Example with retry logic:
import time
__MAX_RETRIES = 5
for attempt in range(__MAX_RETRIES):
try:
conn = psycopg2.connect(...)
cursor = conn.cursor()
cursor.execute(query)
break
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
if attempt == __MAX_RETRIES - 1:
raise
sleep_time = min(60, 2 ** attempt)
log.warning(f"Retry {attempt + 1}/{__MAX_RETRIES} after {sleep_time}s: {e}")
time.sleep(sleep_time)| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") |
There was a problem hiding this comment.
SQL injection vulnerability: The offset parameter is directly interpolated into the SQL query using an f-string without sanitization. While offset comes from controlled code in this example, this pattern is unsafe and violates security best practices.
Use parameterized queries instead:
cur.execute("SELECT * FROM some_table LIMIT %s OFFSET %s", (100, offset))| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | |
| cur.execute("SELECT * FROM some_table LIMIT %s OFFSET %s", (100, offset)) |
| for t in threads: | ||
| t.join() | ||
|
|
||
| return ctx.update_state({"last_sync": "now"}) |
There was a problem hiding this comment.
Missing checkpointing logic. State is only updated once at the end via ctx.update_state({"last_sync": "now"}), which means:
- If the connector fails mid-sync, all progress is lost
- The state value "now" is not meaningful for incremental syncs
- No progress tracking during the sync
The SDK requires checkpointing at appropriate intervals (e.g., after processing each batch) using op.checkpoint(state) with meaningful state values like last processed record ID or timestamp.
| def worker(offset): | ||
| conn = psycopg2.connect( | ||
| host=ctx.config.host, | ||
| dbname=ctx.config.database, | ||
| user=ctx.config.user, | ||
| password=ctx.config.password, | ||
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): | ||
| records.write("redshift_table", {"id": row[0], "data": row}) | ||
| conn.close() |
There was a problem hiding this comment.
Missing logging. The SDK requires using from fivetran_connector_sdk import Logging as log and logging important events such as:
- Connection establishment:
log.info(f"Connected to Redshift at {host}:{port}") - Progress updates:
log.info(f"Processing batch {batch_num}") - Retry attempts:
log.warning(f"Retry {attempt}/{max_retries} due to: {error}") - Errors:
log.severe(f"Failed to fetch data: {error}")
Never use Python's built-in logging module or print() statements.
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): |
There was a problem hiding this comment.
Memory safety violation: fetchall() loads the entire result set into memory, which can cause memory overflow with large datasets.
Use server-side cursors with fetchmany(batch_size) instead:
__BATCH_SIZE = 1000
cursor = connection.cursor(name='server_side_cursor')
cursor.execute(query)
while True:
rows = cursor.fetchmany(__BATCH_SIZE)
if not rows:
break
for row in rows:
op.upsert(table, row_to_dict(row))
op.checkpoint(state)| """ | ||
| Redshift Multithreading connector for Fivetran Connector SDK. | ||
| Demonstrates threaded extraction from Amazon Redshift. | ||
| """ | ||
|
|
||
| import threading | ||
| import psycopg2 | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema | ||
|
|
||
| CONFIG = config.Config( | ||
| host=config.StringField(), | ||
| port=config.IntegerField(default=5439), | ||
| database=config.StringField(), | ||
| user=config.StringField(), | ||
| password=config.SecretField(), | ||
| threads=config.IntegerField(default=4) | ||
| ) | ||
|
|
||
| SCHEMA = schema.Schema( | ||
| name="redshift_table", | ||
| columns={ | ||
| "id": schema.StringColumn(), | ||
| "data": schema.JSONColumn(), | ||
| } | ||
| ) | ||
|
|
||
| @connector( | ||
| name="RedshiftMultithreadingConnector", | ||
| version="0.1.0", | ||
| config=CONFIG, | ||
| schema=SCHEMA, | ||
| ) | ||
| def run_connector(ctx: state.Context): | ||
| def worker(offset): | ||
| conn = psycopg2.connect( | ||
| host=ctx.config.host, | ||
| dbname=ctx.config.database, | ||
| user=ctx.config.user, | ||
| password=ctx.config.password, | ||
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): | ||
| records.write("redshift_table", {"id": row[0], "data": row}) | ||
| conn.close() | ||
|
|
||
| threads = [] | ||
| for i in range(ctx.config.threads): | ||
| t = threading.Thread(target=worker, args=(i * 100,)) | ||
| t.start() | ||
| threads.append(t) | ||
|
|
||
| for t in threads: | ||
| t.join() | ||
|
|
||
| return ctx.update_state({"last_sync": "now"}) |
There was a problem hiding this comment.
Missing the required validate_configuration() function. This function must validate configuration values for correctness (e.g., port ranges, valid formats, required fields) and raise ValueError with descriptive error messages for invalid configurations.
Example:
def validate_configuration(configuration: dict):
"""
Validate the configuration dictionary to ensure it contains all required parameters.
Args:
configuration: a dictionary that holds the configuration settings for the connector.
Raises:
ValueError: if any required configuration parameter is missing or invalid.
"""
required_configs = ["host", "database", "user", "password"]
for key in required_configs:
if key not in configuration:
raise ValueError(f"Missing required configuration value: {key}")
port = configuration.get("port", 5439)
if not isinstance(port, int) or port < 1 or port > 65535:
raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.")This function should be called at the start of the update() function.
| conn = psycopg2.connect( | ||
| host=ctx.config.host, | ||
| dbname=ctx.config.database, | ||
| user=ctx.config.user, | ||
| password=ctx.config.password, | ||
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | ||
| for row in cur.fetchall(): | ||
| records.write("redshift_table", {"id": row[0], "data": row}) | ||
| conn.close() |
There was a problem hiding this comment.
Resource leak: Database connections are not properly closed in case of exceptions. If an error occurs during query execution or data processing, the connection remains open.
Use context managers or try-finally blocks:
conn = None
try:
conn = psycopg2.connect(...)
cur = conn.cursor()
# ... process data
finally:
if conn:
conn.close()Or better, use psycopg2's context manager:
with psycopg2.connect(...) as conn:
with conn.cursor() as cur:
# ... process data| conn = psycopg2.connect( | |
| host=ctx.config.host, | |
| dbname=ctx.config.database, | |
| user=ctx.config.user, | |
| password=ctx.config.password, | |
| port=ctx.config.port, | |
| ) | |
| cur = conn.cursor() | |
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | |
| for row in cur.fetchall(): | |
| records.write("redshift_table", {"id": row[0], "data": row}) | |
| conn.close() | |
| # Use context managers to ensure the connection and cursor are always closed, even if an exception occurs. | |
| with psycopg2.connect( | |
| host=ctx.config.host, | |
| dbname=ctx.config.database, | |
| user=ctx.config.user, | |
| password=ctx.config.password, | |
| port=ctx.config.port, | |
| ) as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") | |
| for row in cur.fetchall(): | |
| records.write("redshift_table", {"id": row[0], "data": row}) |
| port=ctx.config.port, | ||
| ) | ||
| cur = conn.cursor() | ||
| cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}") |
There was a problem hiding this comment.
The pagination logic with fixed LIMIT/OFFSET is inefficient and can lead to data inconsistency issues:
- Performance: OFFSET becomes slower as the offset increases because the database must scan and skip all previous rows
- Data consistency: If rows are inserted/updated between thread executions, some data may be missed or duplicated
- Not incremental: This approach always reads all data, not just new/updated records
For proper incremental syncs, use:
- A replication key (e.g.,
updated_attimestamp or auto-incrementing ID) - WHERE clause filtering:
WHERE updated_at > last_synced_timestamp ORDER BY updated_at - Track progress in state:
state["last_updated_at"]
See connectors/redshift/simple_redshift_connector/connector.py for a proper incremental sync pattern.
|
|
||
| import threading | ||
| import psycopg2 | ||
| from fivetran_connector_sdk import connector, config, state, records, log, schema |
There was a problem hiding this comment.
Import of 'log' is not used.
| from fivetran_connector_sdk import connector, config, state, records, log, schema | |
| from fivetran_connector_sdk import connector, config, state, records, schema |
fivetran-rishabhghosh
left a comment
There was a problem hiding this comment.
Please address copilot comments and add description
|
Surabhi Singh seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Jira ticket
Closes
<ADD TICKET LINK HERE, EACH PR MUST BE LINKED TO A JIRA TICKET>Description of Change
<MENTION A SHORT DESCRIPTION OF YOUR CHANGES HERE>Testing
<MENTION ABOUT YOUR TESTING DETAILS HERE, ATTACH SCREENSHOTS IF NEEDED (WITHOUT PII)>Checklist
Some tips and links to help validate your PR:
fivetran debugcommand.