feat(codecs): fetch Avro schema dynamically from Confluent Schema Registry#25093
feat(codecs): fetch Avro schema dynamically from Confluent Schema Registry#25093omwbennett wants to merge 2 commits intovectordotdev:masterfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 05bb92c5b1
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| tokio::task::block_in_place(|| { | ||
| tokio::runtime::Handle::current().block_on(async { | ||
| let resp = http_client |
There was a problem hiding this comment.
Avoid panic on single-thread Tokio runtimes
Calling tokio::task::block_in_place here will panic when the decoder runs on a current_thread runtime, and the nested Handle::current() call also panics if parsing happens outside Tokio altogether. This means schema-registry decoding can crash the process (instead of returning an error) on the first uncached schema ID in contexts that do use current-thread runtimes in this repo (for example component-validation runners), so schema fetch should avoid block_in_place/Handle::current assumptions and use a non-panicking path.
Useful? React with 👍 / 👎.
Summary
Adds support for fetching Avro schemas dynamically from a Confluent Schema Registry, closing #19546.
Previously, the Avro decoder required an inline schema to be provided in the Vector config. Schemas are often stored in a registry so users have to fetch schemas when generating the config, either manually or using custom config generation tooling. This also means Vector is unable to handle compatible schema evolutions where a topic may contain messages serialized with different schema versions.
With this change, a schema_registry block can be specified instead of an inline schema which tells the decoder to fetch the correct schema using the schema ID embedded in each message (according to the Confluent wire-format).
Schemas are fetched on first use and cached locally by schema ID, so subsequent messages with the same schema ID incur no network overhead.
Vector configuration
An example config:
How did you test this PR?
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Closes: #19546
Notes
The
schemaandschema_registryfields are mutually exclusive. Specifying both will return an error at startup. Whenschema_registryis configured, thestrip_schema_id_prefixoption is ignored as the wire-format prefix is always consumed to extract the schema ID.Schema fetches are performed synchronously inside
Deserializer::parse()usingtokio::task::block_in_placewhich isn't ideal as many threads could potentially block up to the request timeout if the schema registry was unavailable. Fixing this is a bit trickier as parse() is not async so it would require a larger refactor.The impact seems minimal as schemas are fetched at most once per unique schema ID and cached indefinitely, so blocking only occurs on the first message for each schema.
max_blocking_threadsis also set quite high (20k) so we shouldn't end up blocking all threads.