From 6c3b460c8f2a8c360077a4e3de9ede70cbbe8c6f Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 24 Oct 2025 13:58:50 +0200 Subject: [PATCH] Improve lookup joins --- docs/joins.md | 164 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 142 insertions(+), 22 deletions(-) diff --git a/docs/joins.md b/docs/joins.md index 070144e36..dbfbacdab 100644 --- a/docs/joins.md +++ b/docs/joins.md @@ -150,23 +150,19 @@ If you change timestamps of the record during processing, they will be processed !!! warning This is an experimental feature; the API may change in future releases. +`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with data from external systems. This is particularly useful for enriching streaming data with configuration or reference data from external sources like databases, configuration services, or APIs. -`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with the data from external systems. +### Key Concepts -You can use it to enriching streaming data with configuration or reference data from an external source, like a database. +Lookup joins work by: -### Example - -To perform a lookup join, you need: - -1. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseLookup](api-reference/dataframe.md#baselookup) to query the external source and cache the results when necessary. -2. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseField](api-reference/dataframe.md#basefield) to define how the data is extracted from the result. -3. To pass the lookup and the fields to the `StreamingDataFrame.join_lookup`. +1. **Lookup Strategy**: A subclass of `BaseLookup` that defines how to query external sources and cache results +2. **Field Definitions**: Subclasses of `BaseField` that specify how to extract and map enrichment data +3. **In-place Enrichment**: Records are updated directly with the enrichment data +### Basic Example with SQLite -See [SQLiteLookup](api-reference/dataframe.md#sqlitelookup) and [SQLiteLookupField](api-reference/dataframe.md#sqlitelookupfield) for the reference implementation. - -Here is an example of lookup join with a SQLite database: +Here's a simple example using SQLite for reference data: ```python from quixstreams import Application @@ -174,17 +170,26 @@ from quixstreams.dataframe.joins.lookups import SQLiteLookup, SQLiteLookupField app = Application(...) -# An implementation of BaseLookup for SQLite -lookup = SQLiteLookup(path="db.db") +# Create a lookup instance for SQLite database +lookup = SQLiteLookup(path="reference_data.db") -sdf = app.dataframe(app.topic("input")) +sdf = app.dataframe(app.topic("sensor-data")) +# Enrich sensor data with reference information sdf = sdf.join_lookup( lookup, - on="column", # A column in StreamingDataFrame to join on + on="device_id", # Column to match on fields={ - # A mapping with SQLite fields to join with - "lookup": SQLiteLookupField(table="table", columns=["column"], on="id"), + "device_info": SQLiteLookupField( + table="devices", + columns=["name", "model", "location"], + on="device_id" + ), + "calibration": SQLiteLookupField( + table="calibrations", + columns=["offset", "scale", "last_calibrated"], + on="device_id" + ) }, ) @@ -192,11 +197,126 @@ if __name__ == '__main__': app.run() ``` -### How it works +### Dynamic Configuration Integration + +For real-time configuration management, use the Quix Configuration Service integration: + +```python +from quixstreams import Application +from quixstreams.dataframe.joins.lookups import QuixConfigurationService + +app = Application() + +# Create a lookup instance pointing to your configuration topic +lookup = QuixConfigurationService( + topic=app.topic("device-configurations"), + app_config=app.config +) + +sdf = app.dataframe(app.topic("sensor-data")) + +# Enrich with dynamic configuration data +sdf = sdf.join_lookup( + lookup=lookup, + on="device_id", + fields={ + "device_config": lookup.json_field( + jsonpath="$.device", + type="device-config" + ), + "calibration_params": lookup.json_field( + jsonpath="$.calibration", + type="device-config" + ), + "firmware_version": lookup.json_field( + jsonpath="$.firmware.version", + type="device-config" + ) + } +) + +if __name__ == '__main__': + app.run() +``` + +### Advanced Configuration Matching + +Use custom key matching logic for complex scenarios: + +```python +def custom_key_matcher(value, key): + """Custom logic to determine configuration key""" + device_type = value.get("device_type", "unknown") + location = value.get("location", "default") + return f"{device_type}-{location}" + + +# Use custom key matching +sdf = sdf.join_lookup( + lookup=lookup, + on=custom_key_matcher, + fields={ + "location_config": lookup.json_field( + jsonpath="$", + type="location-config" + ) + } +) +``` + +### Binary Data Support + +For non-JSON configurations (firmware files, calibration data, etc.): + +```python +sdf = sdf.join_lookup( + lookup=lookup, + on="device_id", + fields={ + "firmware_binary": lookup.bytes_field( + type="firmware" + ), + "calibration_data": lookup.bytes_field( + type="calibration" + ) + } +) +``` + +### How Lookup Joins Work + +1. **Record Processing**: For each record in the dataframe, the lookup strategy is called with the matching key and field definitions +2. **External Query**: The lookup strategy queries the external source (database, API, configuration service) based on the key +3. **Data Extraction**: Field definitions specify how to extract and map the enrichment data from the external source +4. **In-place Update**: The record is updated directly with the enrichment data +5. **Caching**: Results are cached locally to minimize external calls and improve performance + +### Performance Considerations + +- **Caching**: Lookup results are cached to minimize external API calls +- **Batch Processing**: Consider batching multiple lookups for better performance +- **Error Handling**: Implement proper fallback behavior for missing data +- **Memory Usage**: Be mindful of cache size for large datasets + +### Use Cases + +- **Configuration Enrichment**: Enrich streaming data with device configurations, calibration parameters, or system settings +- **Reference Data**: Join with lookup tables for device information, user profiles, or product catalogs +- **Real-time Updates**: Use with Dynamic Configuration for real-time configuration updates +- **Data Validation**: Enrich with validation rules or business logic +- **Multi-source Enrichment**: Combine data from multiple external sources + +### Integration with Quix Cloud + +For production use cases, consider using the [Quix Dynamic Configuration service](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) which provides: + +- **Real-time Configuration Updates**: Lightweight Kafka events for configuration changes +- **Version Management**: Automatic versioning and timestamp-based lookups +- **Large File Support**: Handle configuration files too large for direct Kafka streaming +- **Binary Data Support**: Support for both JSON and binary configuration content +- **High Performance**: Optimized for high-throughput streaming applications -- For each record in the dataframe, a user-defined lookup strategy (a subclass of `BaseLookup`) is called with a mapping of field names to field definitions (subclasses of `BaseField`). -- The lookup strategy fetches or computes enrichment data based on the provided key and fields, and updates the record in-place. -- The enrichment can come from external sources such as configuration topics, databases, or in-memory data. +👉 See the [Dynamic Configuration documentation](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) for complete setup and usage details. ## Interval join