Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 142 additions & 22 deletions docs/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,53 +150,173 @@ If you change timestamps of the record during processing, they will be processed
!!! warning
This is an experimental feature; the API may change in future releases.

`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with data from external systems. This is particularly useful for enriching streaming data with configuration or reference data from external sources like databases, configuration services, or APIs.

`StreamingDataFrame.join_lookup()` is a special type of join that allows you to enrich records in a streaming dataframe with the data from external systems.
### Key Concepts

You can use it to enriching streaming data with configuration or reference data from an external source, like a database.
Lookup joins work by:

### Example

To perform a lookup join, you need:

1. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseLookup](api-reference/dataframe.md#baselookup) to query the external source and cache the results when necessary.
2. A subclass of [quixstreams.dataframe.joins.lookups.base.BaseField](api-reference/dataframe.md#basefield) to define how the data is extracted from the result.
3. To pass the lookup and the fields to the `StreamingDataFrame.join_lookup`.
1. **Lookup Strategy**: A subclass of `BaseLookup` that defines how to query external sources and cache results
2. **Field Definitions**: Subclasses of `BaseField` that specify how to extract and map enrichment data
3. **In-place Enrichment**: Records are updated directly with the enrichment data

### Basic Example with SQLite

See [SQLiteLookup](api-reference/dataframe.md#sqlitelookup) and [SQLiteLookupField](api-reference/dataframe.md#sqlitelookupfield) for the reference implementation.

Here is an example of lookup join with a SQLite database:
Here's a simple example using SQLite for reference data:

```python
from quixstreams import Application
from quixstreams.dataframe.joins.lookups import SQLiteLookup, SQLiteLookupField

app = Application(...)

# An implementation of BaseLookup for SQLite
lookup = SQLiteLookup(path="db.db")
# Create a lookup instance for SQLite database
lookup = SQLiteLookup(path="reference_data.db")

sdf = app.dataframe(app.topic("input"))
sdf = app.dataframe(app.topic("sensor-data"))

# Enrich sensor data with reference information
sdf = sdf.join_lookup(
lookup,
on="column", # A column in StreamingDataFrame to join on
on="device_id", # Column to match on
fields={
# A mapping with SQLite fields to join with
"lookup": SQLiteLookupField(table="table", columns=["column"], on="id"),
"device_info": SQLiteLookupField(
table="devices",
columns=["name", "model", "location"],
on="device_id"
),
"calibration": SQLiteLookupField(
table="calibrations",
columns=["offset", "scale", "last_calibrated"],
on="device_id"
)
},
)

if __name__ == '__main__':
app.run()
```

### How it works
### Dynamic Configuration Integration

For real-time configuration management, use the Quix Configuration Service integration:

```python
from quixstreams import Application
from quixstreams.dataframe.joins.lookups import QuixConfigurationService

app = Application()

# Create a lookup instance pointing to your configuration topic
lookup = QuixConfigurationService(
topic=app.topic("device-configurations"),
app_config=app.config
)

sdf = app.dataframe(app.topic("sensor-data"))

# Enrich with dynamic configuration data
sdf = sdf.join_lookup(
lookup=lookup,
on="device_id",
fields={
"device_config": lookup.json_field(
jsonpath="$.device",
type="device-config"
),
"calibration_params": lookup.json_field(
jsonpath="$.calibration",
type="device-config"
),
"firmware_version": lookup.json_field(
jsonpath="$.firmware.version",
type="device-config"
)
}
)

if __name__ == '__main__':
app.run()
```

### Advanced Configuration Matching

Use custom key matching logic for complex scenarios:

```python
def custom_key_matcher(value, key):
"""Custom logic to determine configuration key"""
device_type = value.get("device_type", "unknown")
location = value.get("location", "default")
return f"{device_type}-{location}"


# Use custom key matching
sdf = sdf.join_lookup(
lookup=lookup,
on=custom_key_matcher,
fields={
"location_config": lookup.json_field(
jsonpath="$",
type="location-config"
)
}
)
```

### Binary Data Support

For non-JSON configurations (firmware files, calibration data, etc.):

```python
sdf = sdf.join_lookup(
lookup=lookup,
on="device_id",
fields={
"firmware_binary": lookup.bytes_field(
type="firmware"
),
"calibration_data": lookup.bytes_field(
type="calibration"
)
}
)
```

### How Lookup Joins Work

1. **Record Processing**: For each record in the dataframe, the lookup strategy is called with the matching key and field definitions
2. **External Query**: The lookup strategy queries the external source (database, API, configuration service) based on the key
3. **Data Extraction**: Field definitions specify how to extract and map the enrichment data from the external source
4. **In-place Update**: The record is updated directly with the enrichment data
5. **Caching**: Results are cached locally to minimize external calls and improve performance

### Performance Considerations

- **Caching**: Lookup results are cached to minimize external API calls
- **Batch Processing**: Consider batching multiple lookups for better performance
- **Error Handling**: Implement proper fallback behavior for missing data
- **Memory Usage**: Be mindful of cache size for large datasets

### Use Cases

- **Configuration Enrichment**: Enrich streaming data with device configurations, calibration parameters, or system settings
- **Reference Data**: Join with lookup tables for device information, user profiles, or product catalogs
- **Real-time Updates**: Use with Dynamic Configuration for real-time configuration updates
- **Data Validation**: Enrich with validation rules or business logic
- **Multi-source Enrichment**: Combine data from multiple external sources

### Integration with Quix Cloud

For production use cases, consider using the [Quix Dynamic Configuration service](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) which provides:

- **Real-time Configuration Updates**: Lightweight Kafka events for configuration changes
- **Version Management**: Automatic versioning and timestamp-based lookups
- **Large File Support**: Handle configuration files too large for direct Kafka streaming
- **Binary Data Support**: Support for both JSON and binary configuration content
- **High Performance**: Optimized for high-throughput streaming applications

- For each record in the dataframe, a user-defined lookup strategy (a subclass of `BaseLookup`) is called with a mapping of field names to field definitions (subclasses of `BaseField`).
- The lookup strategy fetches or computes enrichment data based on the provided key and fields, and updates the record in-place.
- The enrichment can come from external sources such as configuration topics, databases, or in-memory data.
👉 See the [Dynamic Configuration documentation](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html) for complete setup and usage details.

## Interval join

Expand Down