Skip to content

Commit 0bd5fe0

Browse files
authored
Add lookup join example (#656)
1 parent 16c763d commit 0bd5fe0

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

managed/dynamic-configuration/README.md

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,121 @@ The blob storage configuration is automatically injected only when `contentStore
7979
## Learn More
8080

8181
For complete details, check our [official documentation](https://quix.io/docs/quix-cloud/managed-services/dynamic-configuration.html).
82+
83+
## Using with Quix Streams join_lookup
84+
85+
The Dynamic Configuration Manager works seamlessly with Quix Streams' `join_lookup` feature to enrich streaming data with configuration data in real-time.
86+
87+
### Basic Example
88+
89+
Here's how to use Dynamic Configuration with `join_lookup` to enrich sensor data with device configurations:
90+
91+
```python
92+
from quixstreams import Application
93+
from quixstreams.dataframe.joins.lookups import QuixConfigurationService
94+
95+
# Initialize the application
96+
app = Application()
97+
98+
# Create a lookup instance pointing to your configuration topic
99+
lookup = QuixConfigurationService(
100+
topic=app.topic("device-configurations"),
101+
app_config=app.config
102+
)
103+
104+
# Create your main data stream
105+
sdf = app.dataframe(app.topic("sensor-data"))
106+
107+
# Enrich sensor data with device configuration
108+
sdf = sdf.join_lookup(
109+
lookup=lookup,
110+
on="device_id", # The field to match on
111+
fields={
112+
"device_name": lookup.json_field(
113+
jsonpath="$.device.name",
114+
type="device-config"
115+
),
116+
"calibration_params": lookup.json_field(
117+
jsonpath="$.calibration",
118+
type="device-config"
119+
),
120+
"firmware_version": lookup.json_field(
121+
jsonpath="$.firmware.version",
122+
type="device-config"
123+
)
124+
}
125+
)
126+
127+
# Process the enriched data
128+
sdf = sdf.apply(lambda value: {
129+
**value,
130+
"device_info": f"{value['device_name']} (v{value['firmware_version']})"
131+
})
132+
133+
# Output to destination topic
134+
sdf.to_topic(app.topic("enriched-sensor-data"))
135+
136+
if __name__ == "__main__":
137+
app.run()
138+
```
139+
140+
### Advanced Configuration Matching
141+
142+
You can also use custom key matching logic for more complex scenarios:
143+
144+
```python
145+
def custom_key_matcher(value, key):
146+
"""Custom logic to determine configuration key"""
147+
device_type = value.get("device_type", "unknown")
148+
location = value.get("location", "default")
149+
return f"{device_type}-{location}"
150+
151+
# Use custom key matching
152+
sdf = sdf.join_lookup(
153+
lookup=lookup,
154+
on=custom_key_matcher,
155+
fields={
156+
"config": lookup.json_field(
157+
jsonpath="$",
158+
type="location-config"
159+
)
160+
}
161+
)
162+
```
163+
164+
### Binary Configuration Support
165+
166+
For non-JSON configurations (firmware files, calibration data, etc.):
167+
168+
```python
169+
sdf = sdf.join_lookup(
170+
lookup=lookup,
171+
on="device_id",
172+
fields={
173+
"firmware_binary": lookup.bytes_field(
174+
type="firmware"
175+
),
176+
"calibration_data": lookup.bytes_field(
177+
type="calibration"
178+
)
179+
}
180+
)
181+
```
182+
183+
### How It Works
184+
185+
1. **Configuration Updates**: When configurations are updated via the Dynamic Configuration API, lightweight Kafka events are published to your configuration topic.
186+
187+
2. **Real-time Enrichment**: The `join_lookup` feature listens to these events, fetches the latest configuration content, and caches it locally.
188+
189+
3. **Stream Enrichment**: As your main data stream processes records, `join_lookup` automatically enriches each record with the appropriate configuration data based on the matching key and timestamp.
190+
191+
4. **Version Management**: The system automatically handles configuration versioning, ensuring that each record is enriched with the configuration version that was valid at the time the record was created.
192+
193+
### Benefits
194+
195+
- **Real-time Updates**: Configuration changes are immediately available to your streaming applications
196+
- **Large File Support**: Handle configuration files too large for direct Kafka streaming
197+
- **Version Control**: Automatic versioning ensures data consistency
198+
- **Performance**: Local caching minimizes API calls and latency
199+
- **Flexibility**: Support for both JSON and binary configuration content

0 commit comments

Comments
 (0)