Skip to content

materialize-clickhouse: add ClickHouse materialization connector#4022

Open
jacobmarble wants to merge 35 commits intomainfrom
jgm-clickhouse
Open

materialize-clickhouse: add ClickHouse materialization connector#4022
jacobmarble wants to merge 35 commits intomainfrom
jgm-clickhouse

Conversation

@jacobmarble
Copy link
Contributor

@jacobmarble jacobmarble commented Mar 17, 2026

Description:

Adds materialize-clickhouse.

Closes #783

Workflow steps:

Configuration is similar to other materialize types.

Documentation links affected:

Documentation added in estuary/flow#2776

Notes for reviewers:

Quirks

ClickHouse is append-only, so delta updates cannot be implemented as database-level operations. To update an existing record, we insert a new version (column _version) of the record (a record with the same primary key aka order by key), and leave the old record as-is. To delete an existing record, we insert a tombstone record, which is simply a record with column _is_deleted = 1.

In a background process, the ReplacingMergeTree engine removes duplicate records AND deleted records after a non-deterministic period of time. Until then, queries return all versions of the record.

To mitigate this, we add the FINAL qualifier to our own Load queries. This causes the query engine to deduplicate primary keys at query time, but does not cause it to remove deleted records from results. (Weird, right?) Hence, the WHERE is_deleted = 0 clause.

Please Review Closely

I'm uncertain about the way that soft delete and hard delete are coupled. What does the user expect when a document is soft-deleted?

Other materialize implementations's load queries fetch the binding as well as the document identifier. I'm not sure what these things are yet, so perhaps a good topic for 1:1 conversation.

Performance

The Load() method is relatively naive compared to others - it synchronously executes a SELECT query per loaded key. I've left it this way for now so that we can improve performance empirically when required.

@jacobmarble
Copy link
Contributor Author

I'm a little concerned about the connector state variable. We need to guarantee monotonically increasing version numbers per record, and I'm just storing a version value in the state.

What if multiple tasks implement a single materialize? Is that state object per-task, or have I introduced a race condition?

It might be better to track version per record, will consider it today.

Implements a Go materialization connector for ClickHouse using
ReplacingMergeTree tables with INSERT-only upsert semantics. Key design
choices:

- ReplacingMergeTree exclusively (no delta/MergeTree mode)
- Native bulk inserts via clickhouse-go PrepareBatch
- Includes offline tests, online tests against live ClickHouse, and
  full integration tests
Hard-delete rows now use the full record values from ConvertAll instead
of synthesizing typed zero values per column. ReplacingMergeTree only
needs _is_deleted=1 to hide rows from FINAL queries, so the actual
column values in delete rows don't matter. This simplifies the code by
removing the tombstoneValue helper and its type-specific zero map.
Replace the sequential single-batch approach with a map of per-binding
batches. Rows are appended during the Store loop and all batches are
flushed in the StartCommitFunc callback, allowing interleaved bindings
and deferring the network send to commit time.
Remove connectorState and the incrementing version counter persisted in
the Flow recovery log. Instead, always write _version=0 and rely on
ReplacingMergeTree's documented tie-breaking rule: when ver is equal for
multiple rows, the most recently inserted row wins.

This eliminates the checkpoint JSON serialization, UnmarshalState logic,
and the associated state management complexity.
Enable automatic background CLEANUP merges on ReplacingMergeTree tables
via SETTINGS block in CREATE TABLE. This ensures deleted rows are
eventually purged without requiring manual OPTIMIZE ... FINAL CLEANUP.

Also adds a README documenting ClickHouse table mechanics, the
ReplacingMergeTree approach for updates/deletes, and illustrated
examples of merge behavior. Bumps docker-compose ClickHouse to 25.8.
@jacobmarble
Copy link
Contributor Author

I've made some important changes since I opened the PR.

Deleting records is simplified. Rather than replace all record values with zero values, we set _is_deleted = 1. Also enabled automatic background CLEANUP merges so that those records are actually removed by the merge background process.

The Load() method now loads up to 1000 records per query. I had simplified this to load just one record per query, but went back after learning the scaling factor of the load iterator.

The Store() method now accumulates batches per binding, and commits those together in the returned StartCommitFunc(), which resonates with the Transactor interface better.

Record version is always zero so that version reconciliation depends on the age of the part, not the value of the version. This shifts the logical clock burden to ClickHouse, where it likely doesn't add meaningful load.

Added a README for humans.

@danielnelson
Copy link
Contributor

This might be imprecise, since it just reflects my current understanding:

ClickHouse is append-only, so delta updates cannot be implemented as database-level operations.

I wonder if it makes sense to only operate in delta update mode. In this mode, there is no Load operation and we write all changes in the Store phase. You can see this mode in the filesink materializations, such as materialize-s3-parquet, and I think it can be enabled optionally for any materialization binding.

The Load query could still have value, it would reduce the size of any deltas. So maybe its a question of what the default mode is?

Other materialize implementations's load queries fetch the binding as well as the document identifier. I'm not sure what these things are yet, so perhaps a good topic for 1:1 conversation.

The binding is a number that identifies the binding, in case there is more than one collection being written. The document identifier is usually the flow_document field, this is a projection of the full document like in the collection (check the example below). The runtime can compare this document, or use another reduction method, to what is existing to determine which rows to write.

There also exists a no_flow_document feature flag that most materializations support, where this field is not written and the load query recreates it from the other fields. This reduces the size of the data written, as in normal mode the data is essentially written a second time for each record.

I'm uncertain about the way that soft delete and hard delete are coupled. What does the user expect when a document is soft-deleted?

Soft-delete is a tombstone and hard-delete is actually deleted, so _is_deleted sounds like hard delete. For soft delete, say you have a table like:

[+]postgres=> select * from name_role;
 name  |  role   
-------+---------
 alice | manager

A normal OTLP materialization (with flow_document) will look something like this:

[+]postgres=> select * from mat_name_role;
 name  | _meta/op |       flow_published_at       |  role   |                                                                                   flow_document                                                                                    
-------+----------+-------------------------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 alice | c        | 2026-03-17 15:15:57.852371-07 | manager | {"_meta":{"op":"c","source":{"loc":[-1,0,0],"schema":"public","snapshot":true,"table":"name_role"},"uuid":"df8c9c3f-224e-11f1-8001-53570e99f5c0"},"name":"alice","role":"manager"}
(1 row)

Then if this record is deleted op changes to d:

[+]postgres=> select * from mat_name_role;
 name  | _meta/op |       flow_published_at       |  role   |                                                                                                      flow_document                                                                                                      
-------+----------+-------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 alice | d        | 2026-03-17 15:18:07.877306-07 | manager | {"_meta":{"op":"d","source":{"loc":[25940456,25940624,25940768],"schema":"public","table":"name_role","ts_ms":1773785887903,"txid":861},"uuid":"2d0cd741-224f-11f1-8001-53570e99f5c0"},"name":"alice","role":"manager"}
(1 row)

I would think to soft delete we would just append the op: d row.

CI expects container name materialize-clickhouse-db-1 but service was
named 'clickhouse', producing materialize-clickhouse-clickhouse-1.
The dedicated _version UInt64 column (always set to 0) is replaced by
flow_published_at, a standard Estuary metadata timestamp that naturally
increases with each transaction. This simplifies the schema and makes
version ordering explicit rather than relying on part insertion order.

Also corrects README record counts, FINAL behavior description (FINAL
both deduplicates and filters deleted rows when is_deleted is configured),
and removes the WHERE _is_deleted = 0 predicate from load queries since
FINAL already handles it.
@jacobmarble
Copy link
Contributor Author

I've added batching limits today - flush batches at 100,000 records or one minute, which ever comes first.

Load keys are now bulk-inserted into a session-scoped temporary table via
PrepareBatch, then joined against the target table to fetch existing
documents. Each binding gets its own single-connection native pool to
guarantee temp table visibility. This eliminates the GroupSet parameter
binding limitation and supports much larger key batches (100k vs 1k).
Replace openDB, openNativeConn, and openNativeConnSingle with a single
newClickhouseOptions() that returns *clickhouse.Options, letting callers
configure pool settings and open connections directly.
Copy link
Member

@mdibaiee mdibaiee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general structure of the connector looks good! Thank you!

I left some inline comments about specific parts, but two main issues need to be addressed:

Deletions

I think we should re-use the existing _meta/op: "d" instead of _is_deleted for deletions. One way to do this is:

  1. In Validate, the connector should signal that _meta/op is a column that must be materialized (this is called putting a "constraint" on a column) -- this means the connector asks that this column must always be materialized and the user cannot disable / exclude this column. This then means we are guaranteed to be always materializing this column.
  2. Update the code so that instead of checking for _is_deleted, we check for _meta/op column being equal to the string d for deletions

This may or may not work with Clickhouse, I'm not sure about how flexible their system is for deletions, but let me know what you think on this

Transactions

Currently the Store phase is inserting documents in batches, but there is no transactionality: i.e. if the Flow transaction fails at any point in time, the documents that have been inserted are not going to be rolled back. Further, documents should not be visible in the destination system until the whole transaction can be committed at the same time.

We most likely want to use the post-commit apply pattern in Clickhouse as well, so: stage inserts in a temporary table, and in Acknowledge move them over to the destination tables, or something along these lines. We can discuss more about this to better understand the constraints of Clickhouse and see how we can best achieve transactionality.

@jacobmarble
Copy link
Contributor Author

I can improve the _is_deleted column a bit by making it materialized, in other words the value will be automatically inferred, and it can be ignored by golang code:

_is_deleted = (_meta/op = 'd' ? 1 : 0)

In order to atomically store more than one batch (100,000) of documents, ClickHouse does provide a method, but it involves writing to a staging table that lives on disk, then migrating the inserted partitions to the target table. To just INSERT ... SELECT ... is very inefficient by comparison.

@mdibaiee
Copy link
Member

@jacobmarble regarding is_deleted, the real concern is trying to avoid having an extra column in the customer's table, other than what is standard, so ideally we want to implement deletions using only the existing _meta/op column

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Request a connector to materialize to ClickHouse

3 participants