Skip to content

Migrate to orjson (JSON library) for better performance #2016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements/requirements-schemaregistry.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ attrs>=21.2.0
cachetools>=5.5.0
httpx>=0.26
authlib>=1.0.0
orjson >= 3.10
8 changes: 4 additions & 4 deletions src/confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# derived from https://github.com/verisign/python-confluent-schemaregistry.git
#
import io
import json
import orjson
import logging
import struct
import sys
Expand Down Expand Up @@ -80,7 +80,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
# Encoder support
def _get_encoder_func(self, writer_schema):
if HAS_FAST:
schema = json.loads(str(writer_schema))
schema = orjson.loads(str(writer_schema))
parsed_schema = parse_schema(schema)
return lambda record, fp: schemaless_writer(fp, parsed_schema, record)
writer = avro.io.DatumWriter(writer_schema)
Expand Down Expand Up @@ -176,9 +176,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
if HAS_FAST:
# try to use fast avro
try:
fast_avro_writer_schema = parse_schema(json.loads(str(writer_schema_obj)))
fast_avro_writer_schema = parse_schema(orjson.loads(str(writer_schema_obj)))
if reader_schema_obj is not None:
fast_avro_reader_schema = parse_schema(json.loads(str(reader_schema_obj)))
fast_avro_reader_schema = parse_schema(orjson.loads(str(reader_schema_obj)))
else:
fast_avro_reader_schema = None
schemaless_reader(payload, fast_avro_writer_schema)
Expand Down
6 changes: 3 additions & 3 deletions src/confluent_kafka/kafkatest/verifiable_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


import datetime
import json
import orjson
import os
import re
import signal
Expand Down Expand Up @@ -61,8 +61,8 @@ def err(self, s, term=False):
def send(self, d):
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
d['_time'] = str(datetime.datetime.now())
self.dbg('SEND: %s' % json.dumps(d))
sys.stdout.write('%s\n' % json.dumps(d))
self.dbg('SEND: %s' % orjson.dumps(d).decode("utf-8"))
sys.stdout.write('%s\n' % orjson.dumps(d).decode("utf-8"))
sys.stdout.flush()

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/schema_registry/_async/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
from json import loads
import orjson
from typing import Dict, Union, Optional, Callable

from fastavro import schemaless_reader, schemaless_writer
Expand Down Expand Up @@ -270,7 +270,7 @@ async def __init_impl(
# i.e. {"type": "string"} has a name of string.
# This function does not comply.
# https://github.com/fastavro/fastavro/issues/415
schema_dict = loads(schema.schema_str)
schema_dict = orjson.loads(schema.schema_str)
schema_name = parsed_schema.get("name", schema_dict.get("type"))
else:
schema_name = None
Expand Down
12 changes: 6 additions & 6 deletions src/confluent_kafka/schema_registry/_async/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import json
import orjson
from typing import Union, Optional, Tuple, Callable

from cachetools import LRUCache
Expand Down Expand Up @@ -66,7 +66,7 @@
for ref in schema.references:
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)
ref_registry = await _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry)
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str)
resource = Resource.from_contents(
referenced_schema_dict, default_specification=DEFAULT_SPEC)
ref_registry = ref_registry.with_resource(ref.name, resource)
Expand Down Expand Up @@ -203,7 +203,7 @@
'schema.id.serializer': prefix_schema_id_serializer,
'validate': True}

async def __init_impl(

Check failure on line 206 in src/confluent_kafka/schema_registry/_async/json_schema.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/json_schema.py#L206

Refactor this function to reduce its Cognitive Complexity from 28 to the 15 allowed.
self,
schema_str: Union[str, Schema, None],
schema_registry_client: AsyncSchemaRegistryClient,
Expand All @@ -221,7 +221,7 @@
else:
self._schema = None

self._json_encode = json_encode or json.dumps
self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8"))
self._registry = schema_registry_client
self._rule_registry = (
rule_registry if rule_registry else RuleRegistry.get_global_instance()
Expand Down Expand Up @@ -394,7 +394,7 @@
return result

ref_registry = await _resolve_named_schema(schema, self._registry)
parsed_schema = json.loads(schema.schema_str)
parsed_schema = orjson.loads(schema.schema_str)

self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
return parsed_schema, ref_registry
Expand Down Expand Up @@ -478,7 +478,7 @@
'schema.id.deserializer': dual_schema_id_deserializer,
'validate': True}

async def __init_impl(

Check failure on line 481 in src/confluent_kafka/schema_registry/_async/json_schema.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/json_schema.py#L481

Refactor this function to reduce its Cognitive Complexity from 28 to the 15 allowed.
self,
schema_str: Union[str, Schema, None],
from_dict: Optional[Callable[[dict, SerializationContext], object]] = None,
Expand Down Expand Up @@ -510,7 +510,7 @@
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._parsed_schemas = ParsedSchemaCache()
self._validators = LRUCache(1000)
self._json_decode = json_decode or json.loads
self._json_decode = json_decode or orjson.loads
self._use_schema_id = None

conf_copy = self._default_conf.copy()
Expand Down Expand Up @@ -659,7 +659,7 @@
return result

ref_registry = await _resolve_named_schema(schema, self._registry)
parsed_schema = json.loads(schema.schema_str)
parsed_schema = orjson.loads(schema.schema_str)

self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
return parsed_schema, ref_registry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
#
import asyncio
import json
import orjson
import logging
import time
import urllib
Expand Down Expand Up @@ -416,7 +416,7 @@ async def send_request(
" application/json"}

if body is not None:
body = json.dumps(body)
body = orjson.dumps(body).decode('utf-8')
headers = {'Content-Length': str(len(body)),
'Content-Type': "application/vnd.schemaregistry.v1+json"}

Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/schema_registry/_sync/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
from json import loads
import orjson
from typing import Dict, Union, Optional, Callable

from fastavro import schemaless_reader, schemaless_writer
Expand Down Expand Up @@ -270,7 +270,7 @@ def __init_impl(
# i.e. {"type": "string"} has a name of string.
# This function does not comply.
# https://github.com/fastavro/fastavro/issues/415
schema_dict = loads(schema.schema_str)
schema_dict = orjson.loads(schema.schema_str)
schema_name = parsed_schema.get("name", schema_dict.get("type"))
else:
schema_name = None
Expand Down
12 changes: 6 additions & 6 deletions src/confluent_kafka/schema_registry/_sync/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import json
import orjson
from typing import Union, Optional, Tuple, Callable

from cachetools import LRUCache
Expand Down Expand Up @@ -66,7 +66,7 @@
for ref in schema.references:
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version, True)
ref_registry = _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry)
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str)
resource = Resource.from_contents(
referenced_schema_dict, default_specification=DEFAULT_SPEC)
ref_registry = ref_registry.with_resource(ref.name, resource)
Expand Down Expand Up @@ -203,7 +203,7 @@
'schema.id.serializer': prefix_schema_id_serializer,
'validate': True}

def __init_impl(

Check failure on line 206 in src/confluent_kafka/schema_registry/_sync/json_schema.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_sync/json_schema.py#L206

Refactor this function to reduce its Cognitive Complexity from 28 to the 15 allowed.
self,
schema_str: Union[str, Schema, None],
schema_registry_client: SchemaRegistryClient,
Expand All @@ -221,7 +221,7 @@
else:
self._schema = None

self._json_encode = json_encode or json.dumps
self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8"))
self._registry = schema_registry_client
self._rule_registry = (
rule_registry if rule_registry else RuleRegistry.get_global_instance()
Expand Down Expand Up @@ -394,7 +394,7 @@
return result

ref_registry = _resolve_named_schema(schema, self._registry)
parsed_schema = json.loads(schema.schema_str)
parsed_schema = orjson.loads(schema.schema_str)

self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
return parsed_schema, ref_registry
Expand Down Expand Up @@ -478,7 +478,7 @@
'schema.id.deserializer': dual_schema_id_deserializer,
'validate': True}

def __init_impl(

Check failure on line 481 in src/confluent_kafka/schema_registry/_sync/json_schema.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_sync/json_schema.py#L481

Refactor this function to reduce its Cognitive Complexity from 28 to the 15 allowed.
self,
schema_str: Union[str, Schema, None],
from_dict: Optional[Callable[[dict, SerializationContext], object]] = None,
Expand Down Expand Up @@ -510,7 +510,7 @@
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._parsed_schemas = ParsedSchemaCache()
self._validators = LRUCache(1000)
self._json_decode = json_decode or json.loads
self._json_decode = json_decode or orjson.loads
self._use_schema_id = None

conf_copy = self._default_conf.copy()
Expand Down Expand Up @@ -659,7 +659,7 @@
return result

ref_registry = _resolve_named_schema(schema, self._registry)
parsed_schema = json.loads(schema.schema_str)
parsed_schema = orjson.loads(schema.schema_str)

self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
return parsed_schema, ref_registry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
#

import json
import orjson
import logging
import time
import urllib
Expand Down Expand Up @@ -416,7 +416,7 @@ def send_request(
" application/json"}

if body is not None:
body = json.dumps(body)
body = orjson.dumps(body).decode('utf-8')
headers = {'Content-Length': str(len(body)),
'Content-Type': "application/vnd.schemaregistry.v1+json"}

Expand Down
4 changes: 2 additions & 2 deletions src/confluent_kafka/schema_registry/common/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
from copy import deepcopy
from io import BytesIO
from json import loads
import orjson
from typing import Dict, Union, Optional, Set

from fastavro import repository, validate
Expand Down Expand Up @@ -91,7 +91,7 @@ def load(self, subject):

def parse_schema_with_repo(schema_str: str, named_schemas: Dict[str, AvroSchema]) -> AvroSchema:
copy = deepcopy(named_schemas)
copy["$root"] = loads(schema_str)
copy["$root"] = orjson.loads(schema_str)
repo = LocalSchemaRepository(copy)
return load_schema("$root", repo=repo)

Expand Down