From 0bf6297e9087ae49b7d3033c8ec639cfb47e9fab Mon Sep 17 00:00:00 2001 From: Naxin Date: Wed, 6 Aug 2025 16:25:56 -0400 Subject: [PATCH 1/3] update --- requirements/requirements-schemaregistry.txt | 1 + .../avro/serializer/message_serializer.py | 8 ++++---- src/confluent_kafka/kafkatest/verifiable_client.py | 6 +++--- src/confluent_kafka/schema_registry/_async/avro.py | 4 ++-- .../schema_registry/_async/json_schema.py | 12 ++++++------ .../schema_registry/_async/schema_registry_client.py | 4 ++-- src/confluent_kafka/schema_registry/_sync/avro.py | 4 ++-- .../schema_registry/_sync/json_schema.py | 12 ++++++------ .../schema_registry/_sync/schema_registry_client.py | 4 ++-- src/confluent_kafka/schema_registry/common/avro.py | 4 ++-- 10 files changed, 30 insertions(+), 29 deletions(-) diff --git a/requirements/requirements-schemaregistry.txt b/requirements/requirements-schemaregistry.txt index f6e9d5afe..acf18c570 100644 --- a/requirements/requirements-schemaregistry.txt +++ b/requirements/requirements-schemaregistry.txt @@ -2,3 +2,4 @@ attrs>=21.2.0 cachetools>=5.5.0 httpx>=0.26 authlib>=1.0.0 +orjson >= 3.10,<4 diff --git a/src/confluent_kafka/avro/serializer/message_serializer.py b/src/confluent_kafka/avro/serializer/message_serializer.py index d92763e2c..a239ee109 100644 --- a/src/confluent_kafka/avro/serializer/message_serializer.py +++ b/src/confluent_kafka/avro/serializer/message_serializer.py @@ -20,7 +20,7 @@ # derived from https://github.com/verisign/python-confluent-schemaregistry.git # import io -import json +import orjson import logging import struct import sys @@ -80,7 +80,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema= # Encoder support def _get_encoder_func(self, writer_schema): if HAS_FAST: - schema = json.loads(str(writer_schema)) + schema = orjson.loads(str(writer_schema)) parsed_schema = parse_schema(schema) return lambda record, fp: schemaless_writer(fp, parsed_schema, record) writer = avro.io.DatumWriter(writer_schema) @@ -176,9 +176,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): if HAS_FAST: # try to use fast avro try: - fast_avro_writer_schema = parse_schema(json.loads(str(writer_schema_obj))) + fast_avro_writer_schema = parse_schema(orjson.loads(str(writer_schema_obj))) if reader_schema_obj is not None: - fast_avro_reader_schema = parse_schema(json.loads(str(reader_schema_obj))) + fast_avro_reader_schema = parse_schema(orjson.loads(str(reader_schema_obj))) else: fast_avro_reader_schema = None schemaless_reader(payload, fast_avro_writer_schema) diff --git a/src/confluent_kafka/kafkatest/verifiable_client.py b/src/confluent_kafka/kafkatest/verifiable_client.py index 56d4383e3..c42078b1e 100644 --- a/src/confluent_kafka/kafkatest/verifiable_client.py +++ b/src/confluent_kafka/kafkatest/verifiable_client.py @@ -14,7 +14,7 @@ import datetime -import json +import orjson import os import re import signal @@ -61,8 +61,8 @@ def err(self, s, term=False): def send(self, d): """ Send dict as JSON to stdout for consumtion by kafkatest handler """ d['_time'] = str(datetime.datetime.now()) - self.dbg('SEND: %s' % json.dumps(d)) - sys.stdout.write('%s\n' % json.dumps(d)) + self.dbg('SEND: %s' % orjson.dumps(d)) + sys.stdout.write('%s\n' % orjson.dumps(d)) sys.stdout.flush() @staticmethod diff --git a/src/confluent_kafka/schema_registry/_async/avro.py b/src/confluent_kafka/schema_registry/_async/avro.py index fc8ff0749..a311c0363 100644 --- a/src/confluent_kafka/schema_registry/_async/avro.py +++ b/src/confluent_kafka/schema_registry/_async/avro.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import io -from json import loads +import orjson from typing import Dict, Union, Optional, Callable from fastavro import schemaless_reader, schemaless_writer @@ -270,7 +270,7 @@ async def __init_impl( # i.e. {"type": "string"} has a name of string. # This function does not comply. # https://github.com/fastavro/fastavro/issues/415 - schema_dict = loads(schema.schema_str) + schema_dict = orjson.loads(schema.schema_str) schema_name = parsed_schema.get("name", schema_dict.get("type")) else: schema_name = None diff --git a/src/confluent_kafka/schema_registry/_async/json_schema.py b/src/confluent_kafka/schema_registry/_async/json_schema.py index 99c1f7d59..8cacf3d79 100644 --- a/src/confluent_kafka/schema_registry/_async/json_schema.py +++ b/src/confluent_kafka/schema_registry/_async/json_schema.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import io -import json +import orjson from typing import Union, Optional, Tuple, Callable from cachetools import LRUCache @@ -66,7 +66,7 @@ async def _resolve_named_schema( for ref in schema.references: referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True) ref_registry = await _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry) - referenced_schema_dict = json.loads(referenced_schema.schema.schema_str) + referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str) resource = Resource.from_contents( referenced_schema_dict, default_specification=DEFAULT_SPEC) ref_registry = ref_registry.with_resource(ref.name, resource) @@ -221,7 +221,7 @@ async def __init_impl( else: self._schema = None - self._json_encode = json_encode or json.dumps + self._json_encode = json_encode or orjson.dumps self._registry = schema_registry_client self._rule_registry = ( rule_registry if rule_registry else RuleRegistry.get_global_instance() @@ -394,7 +394,7 @@ async def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema] return result ref_registry = await _resolve_named_schema(schema, self._registry) - parsed_schema = json.loads(schema.schema_str) + parsed_schema = orjson.loads(schema.schema_str) self._parsed_schemas.set(schema, (parsed_schema, ref_registry)) return parsed_schema, ref_registry @@ -510,7 +510,7 @@ async def __init_impl( self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance() self._parsed_schemas = ParsedSchemaCache() self._validators = LRUCache(1000) - self._json_decode = json_decode or json.loads + self._json_decode = json_decode or orjson.loads self._use_schema_id = None conf_copy = self._default_conf.copy() @@ -659,7 +659,7 @@ async def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema] return result ref_registry = await _resolve_named_schema(schema, self._registry) - parsed_schema = json.loads(schema.schema_str) + parsed_schema = orjson.loads(schema.schema_str) self._parsed_schemas.set(schema, (parsed_schema, ref_registry)) return parsed_schema, ref_registry diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 72f26a106..9df5b295f 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -16,7 +16,7 @@ # limitations under the License. # import asyncio -import json +import orjson import logging import time import urllib @@ -416,7 +416,7 @@ async def send_request( " application/json"} if body is not None: - body = json.dumps(body) + body = orjson.dumps(body) headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"} diff --git a/src/confluent_kafka/schema_registry/_sync/avro.py b/src/confluent_kafka/schema_registry/_sync/avro.py index 78e7dd8ea..fbb5b6856 100644 --- a/src/confluent_kafka/schema_registry/_sync/avro.py +++ b/src/confluent_kafka/schema_registry/_sync/avro.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import io -from json import loads +import orjson from typing import Dict, Union, Optional, Callable from fastavro import schemaless_reader, schemaless_writer @@ -270,7 +270,7 @@ def __init_impl( # i.e. {"type": "string"} has a name of string. # This function does not comply. # https://github.com/fastavro/fastavro/issues/415 - schema_dict = loads(schema.schema_str) + schema_dict = orjson.loads(schema.schema_str) schema_name = parsed_schema.get("name", schema_dict.get("type")) else: schema_name = None diff --git a/src/confluent_kafka/schema_registry/_sync/json_schema.py b/src/confluent_kafka/schema_registry/_sync/json_schema.py index 88da6322c..3979890a1 100644 --- a/src/confluent_kafka/schema_registry/_sync/json_schema.py +++ b/src/confluent_kafka/schema_registry/_sync/json_schema.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import io -import json +import orjson from typing import Union, Optional, Tuple, Callable from cachetools import LRUCache @@ -66,7 +66,7 @@ def _resolve_named_schema( for ref in schema.references: referenced_schema = schema_registry_client.get_version(ref.subject, ref.version, True) ref_registry = _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry) - referenced_schema_dict = json.loads(referenced_schema.schema.schema_str) + referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str) resource = Resource.from_contents( referenced_schema_dict, default_specification=DEFAULT_SPEC) ref_registry = ref_registry.with_resource(ref.name, resource) @@ -221,7 +221,7 @@ def __init_impl( else: self._schema = None - self._json_encode = json_encode or json.dumps + self._json_encode = json_encode or orjson.dumps self._registry = schema_registry_client self._rule_registry = ( rule_registry if rule_registry else RuleRegistry.get_global_instance() @@ -394,7 +394,7 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti return result ref_registry = _resolve_named_schema(schema, self._registry) - parsed_schema = json.loads(schema.schema_str) + parsed_schema = orjson.loads(schema.schema_str) self._parsed_schemas.set(schema, (parsed_schema, ref_registry)) return parsed_schema, ref_registry @@ -510,7 +510,7 @@ def __init_impl( self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance() self._parsed_schemas = ParsedSchemaCache() self._validators = LRUCache(1000) - self._json_decode = json_decode or json.loads + self._json_decode = json_decode or orjson.loads self._use_schema_id = None conf_copy = self._default_conf.copy() @@ -659,7 +659,7 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti return result ref_registry = _resolve_named_schema(schema, self._registry) - parsed_schema = json.loads(schema.schema_str) + parsed_schema = orjson.loads(schema.schema_str) self._parsed_schemas.set(schema, (parsed_schema, ref_registry)) return parsed_schema, ref_registry diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 3be266538..bf0f6825b 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -16,7 +16,7 @@ # limitations under the License. # -import json +import orjson import logging import time import urllib @@ -416,7 +416,7 @@ def send_request( " application/json"} if body is not None: - body = json.dumps(body) + body = orjson.dumps(body) headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"} diff --git a/src/confluent_kafka/schema_registry/common/avro.py b/src/confluent_kafka/schema_registry/common/avro.py index d28d232a3..9a41d060f 100644 --- a/src/confluent_kafka/schema_registry/common/avro.py +++ b/src/confluent_kafka/schema_registry/common/avro.py @@ -3,7 +3,7 @@ from collections import defaultdict from copy import deepcopy from io import BytesIO -from json import loads +import orjson from typing import Dict, Union, Optional, Set from fastavro import repository, validate @@ -91,7 +91,7 @@ def load(self, subject): def parse_schema_with_repo(schema_str: str, named_schemas: Dict[str, AvroSchema]) -> AvroSchema: copy = deepcopy(named_schemas) - copy["$root"] = loads(schema_str) + copy["$root"] = orjson.loads(schema_str) repo = LocalSchemaRepository(copy) return load_schema("$root", repo=repo) From 210a28ed740a35204580c2a32b4a195205cbd849 Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 7 Aug 2025 14:27:37 -0400 Subject: [PATCH 2/3] unicode --- src/confluent_kafka/kafkatest/verifiable_client.py | 4 ++-- src/confluent_kafka/schema_registry/_async/json_schema.py | 2 +- .../schema_registry/_async/schema_registry_client.py | 2 +- src/confluent_kafka/schema_registry/_sync/json_schema.py | 2 +- .../schema_registry/_sync/schema_registry_client.py | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/confluent_kafka/kafkatest/verifiable_client.py b/src/confluent_kafka/kafkatest/verifiable_client.py index c42078b1e..bd0fca0a4 100644 --- a/src/confluent_kafka/kafkatest/verifiable_client.py +++ b/src/confluent_kafka/kafkatest/verifiable_client.py @@ -61,8 +61,8 @@ def err(self, s, term=False): def send(self, d): """ Send dict as JSON to stdout for consumtion by kafkatest handler """ d['_time'] = str(datetime.datetime.now()) - self.dbg('SEND: %s' % orjson.dumps(d)) - sys.stdout.write('%s\n' % orjson.dumps(d)) + self.dbg('SEND: %s' % orjson.dumps(d).decode("utf-8")) + sys.stdout.write('%s\n' % orjson.dumps(d).decode("utf-8")) sys.stdout.flush() @staticmethod diff --git a/src/confluent_kafka/schema_registry/_async/json_schema.py b/src/confluent_kafka/schema_registry/_async/json_schema.py index 8cacf3d79..2fbdbcccc 100644 --- a/src/confluent_kafka/schema_registry/_async/json_schema.py +++ b/src/confluent_kafka/schema_registry/_async/json_schema.py @@ -221,7 +221,7 @@ async def __init_impl( else: self._schema = None - self._json_encode = json_encode or orjson.dumps + self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8")) self._registry = schema_registry_client self._rule_registry = ( rule_registry if rule_registry else RuleRegistry.get_global_instance() diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 9df5b295f..f067ca5ef 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -416,7 +416,7 @@ async def send_request( " application/json"} if body is not None: - body = orjson.dumps(body) + body = orjson.dumps(x).decode('utf-8') headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"} diff --git a/src/confluent_kafka/schema_registry/_sync/json_schema.py b/src/confluent_kafka/schema_registry/_sync/json_schema.py index 3979890a1..116ed87d1 100644 --- a/src/confluent_kafka/schema_registry/_sync/json_schema.py +++ b/src/confluent_kafka/schema_registry/_sync/json_schema.py @@ -221,7 +221,7 @@ def __init_impl( else: self._schema = None - self._json_encode = json_encode or orjson.dumps + self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8")) self._registry = schema_registry_client self._rule_registry = ( rule_registry if rule_registry else RuleRegistry.get_global_instance() diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index bf0f6825b..1dc7dd0c9 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -416,7 +416,7 @@ def send_request( " application/json"} if body is not None: - body = orjson.dumps(body) + body = orjson.dumps(x).decode('utf-8') headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"} From 4e55ba7c5a3f0357e991089d960d753dc2ad6b0c Mon Sep 17 00:00:00 2001 From: Naxin Date: Thu, 7 Aug 2025 15:29:18 -0400 Subject: [PATCH 3/3] update --- requirements/requirements-schemaregistry.txt | 2 +- .../schema_registry/_async/schema_registry_client.py | 2 +- .../schema_registry/_sync/schema_registry_client.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements/requirements-schemaregistry.txt b/requirements/requirements-schemaregistry.txt index acf18c570..2d42669e8 100644 --- a/requirements/requirements-schemaregistry.txt +++ b/requirements/requirements-schemaregistry.txt @@ -2,4 +2,4 @@ attrs>=21.2.0 cachetools>=5.5.0 httpx>=0.26 authlib>=1.0.0 -orjson >= 3.10,<4 +orjson >= 3.10 diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index f067ca5ef..77a10ef27 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -416,7 +416,7 @@ async def send_request( " application/json"} if body is not None: - body = orjson.dumps(x).decode('utf-8') + body = orjson.dumps(body).decode('utf-8') headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"} diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 1dc7dd0c9..060c541c4 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -416,7 +416,7 @@ def send_request( " application/json"} if body is not None: - body = orjson.dumps(x).decode('utf-8') + body = orjson.dumps(body).decode('utf-8') headers = {'Content-Length': str(len(body)), 'Content-Type': "application/vnd.schemaregistry.v1+json"}