v3.18.0
What's Changed
💎 Join Lookup: PostgreSQL
Added a Lookup join implementation for enriching streaming data with data from a Postgres database.
The new PostgresLookup allows querying a Postgres database for each field, using a persistent connection and per-field caching based on a configurable TTL.
The cache is a "Least Recently Used" (LRU) cache with a configurable maximum size.
See PostgresLookup API docs for more info.
from quixstreams import Application
from quixstreams.dataframe.joins.lookups.postgresql import PostgresLookup
app = Application(...)
sdf = app.dataframe(...)
# Initialize PostgresLookup with Postgres credentials
lookup = PostgresLookup(
host="<host>",
port=5432,
dbname="<db>",
user="<user>",
password="<password>",
cache_size=1000,
)
# Add columns "table_column1" and "table_column2" from "my_table" to the Kafka record as a new field "joined".
# Match by comparing "my_record_field" on the left and "table_column1" on the right.
fields = {
"joined": lookup.field(
table="my_table", columns=["table_column1", "table_column2"], on="table_column1"
),
}
sdf = sdf.join_lookup(lookup, fields, on="my_record_field")
app.run()🦠 Bugfixes
- Fix typo in Producer by @gwaramadze in #953
Full Changelog: v3.17.0...v3.18.0