Skip to content

Commit 765795c

Browse files
authored
Migrate to orjson (JSON library) for better performance (#2016)
* update * unicode * update
1 parent e9ba52e commit 765795c

File tree

10 files changed

+30
-29
lines changed

10 files changed

+30
-29
lines changed

requirements/requirements-schemaregistry.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ attrs>=21.2.0
22
cachetools>=5.5.0
33
httpx>=0.26
44
authlib>=1.0.0
5+
orjson >= 3.10

src/confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# derived from https://github.com/verisign/python-confluent-schemaregistry.git
2121
#
2222
import io
23-
import json
23+
import orjson
2424
import logging
2525
import struct
2626
import sys
@@ -80,7 +80,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
8080
# Encoder support
8181
def _get_encoder_func(self, writer_schema):
8282
if HAS_FAST:
83-
schema = json.loads(str(writer_schema))
83+
schema = orjson.loads(str(writer_schema))
8484
parsed_schema = parse_schema(schema)
8585
return lambda record, fp: schemaless_writer(fp, parsed_schema, record)
8686
writer = avro.io.DatumWriter(writer_schema)
@@ -176,9 +176,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
176176
if HAS_FAST:
177177
# try to use fast avro
178178
try:
179-
fast_avro_writer_schema = parse_schema(json.loads(str(writer_schema_obj)))
179+
fast_avro_writer_schema = parse_schema(orjson.loads(str(writer_schema_obj)))
180180
if reader_schema_obj is not None:
181-
fast_avro_reader_schema = parse_schema(json.loads(str(reader_schema_obj)))
181+
fast_avro_reader_schema = parse_schema(orjson.loads(str(reader_schema_obj)))
182182
else:
183183
fast_avro_reader_schema = None
184184
schemaless_reader(payload, fast_avro_writer_schema)

src/confluent_kafka/kafkatest/verifiable_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
import datetime
17-
import json
17+
import orjson
1818
import os
1919
import re
2020
import signal
@@ -61,8 +61,8 @@ def err(self, s, term=False):
6161
def send(self, d):
6262
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
6363
d['_time'] = str(datetime.datetime.now())
64-
self.dbg('SEND: %s' % json.dumps(d))
65-
sys.stdout.write('%s\n' % json.dumps(d))
64+
self.dbg('SEND: %s' % orjson.dumps(d).decode("utf-8"))
65+
sys.stdout.write('%s\n' % orjson.dumps(d).decode("utf-8"))
6666
sys.stdout.flush()
6767

6868
@staticmethod

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
from json import loads
18+
import orjson
1919
from typing import Dict, Union, Optional, Callable
2020

2121
from fastavro import schemaless_reader, schemaless_writer
@@ -270,7 +270,7 @@ async def __init_impl(
270270
# i.e. {"type": "string"} has a name of string.
271271
# This function does not comply.
272272
# https://github.com/fastavro/fastavro/issues/415
273-
schema_dict = loads(schema.schema_str)
273+
schema_dict = orjson.loads(schema.schema_str)
274274
schema_name = parsed_schema.get("name", schema_dict.get("type"))
275275
else:
276276
schema_name = None

src/confluent_kafka/schema_registry/_async/json_schema.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
import json
18+
import orjson
1919
from typing import Union, Optional, Tuple, Callable
2020

2121
from cachetools import LRUCache
@@ -66,7 +66,7 @@ async def _resolve_named_schema(
6666
for ref in schema.references:
6767
referenced_schema = await schema_registry_client.get_version(ref.subject, ref.version, True)
6868
ref_registry = await _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry)
69-
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
69+
referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str)
7070
resource = Resource.from_contents(
7171
referenced_schema_dict, default_specification=DEFAULT_SPEC)
7272
ref_registry = ref_registry.with_resource(ref.name, resource)
@@ -221,7 +221,7 @@ async def __init_impl(
221221
else:
222222
self._schema = None
223223

224-
self._json_encode = json_encode or json.dumps
224+
self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8"))
225225
self._registry = schema_registry_client
226226
self._rule_registry = (
227227
rule_registry if rule_registry else RuleRegistry.get_global_instance()
@@ -394,7 +394,7 @@ async def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema]
394394
return result
395395

396396
ref_registry = await _resolve_named_schema(schema, self._registry)
397-
parsed_schema = json.loads(schema.schema_str)
397+
parsed_schema = orjson.loads(schema.schema_str)
398398

399399
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
400400
return parsed_schema, ref_registry
@@ -510,7 +510,7 @@ async def __init_impl(
510510
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
511511
self._parsed_schemas = ParsedSchemaCache()
512512
self._validators = LRUCache(1000)
513-
self._json_decode = json_decode or json.loads
513+
self._json_decode = json_decode or orjson.loads
514514
self._use_schema_id = None
515515

516516
conf_copy = self._default_conf.copy()
@@ -659,7 +659,7 @@ async def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema]
659659
return result
660660

661661
ref_registry = await _resolve_named_schema(schema, self._registry)
662-
parsed_schema = json.loads(schema.schema_str)
662+
parsed_schema = orjson.loads(schema.schema_str)
663663

664664
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
665665
return parsed_schema, ref_registry

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
#
1818
import asyncio
19-
import json
19+
import orjson
2020
import logging
2121
import time
2222
import urllib
@@ -416,7 +416,7 @@ async def send_request(
416416
" application/json"}
417417

418418
if body is not None:
419-
body = json.dumps(body)
419+
body = orjson.dumps(body).decode('utf-8')
420420
headers = {'Content-Length': str(len(body)),
421421
'Content-Type': "application/vnd.schemaregistry.v1+json"}
422422

src/confluent_kafka/schema_registry/_sync/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
from json import loads
18+
import orjson
1919
from typing import Dict, Union, Optional, Callable
2020

2121
from fastavro import schemaless_reader, schemaless_writer
@@ -270,7 +270,7 @@ def __init_impl(
270270
# i.e. {"type": "string"} has a name of string.
271271
# This function does not comply.
272272
# https://github.com/fastavro/fastavro/issues/415
273-
schema_dict = loads(schema.schema_str)
273+
schema_dict = orjson.loads(schema.schema_str)
274274
schema_name = parsed_schema.get("name", schema_dict.get("type"))
275275
else:
276276
schema_name = None

src/confluent_kafka/schema_registry/_sync/json_schema.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
import json
18+
import orjson
1919
from typing import Union, Optional, Tuple, Callable
2020

2121
from cachetools import LRUCache
@@ -66,7 +66,7 @@ def _resolve_named_schema(
6666
for ref in schema.references:
6767
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version, True)
6868
ref_registry = _resolve_named_schema(referenced_schema.schema, schema_registry_client, ref_registry)
69-
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
69+
referenced_schema_dict = orjson.loads(referenced_schema.schema.schema_str)
7070
resource = Resource.from_contents(
7171
referenced_schema_dict, default_specification=DEFAULT_SPEC)
7272
ref_registry = ref_registry.with_resource(ref.name, resource)
@@ -221,7 +221,7 @@ def __init_impl(
221221
else:
222222
self._schema = None
223223

224-
self._json_encode = json_encode or json.dumps
224+
self._json_encode = json_encode or (lambda x: orjson.dumps(x).decode("utf-8"))
225225
self._registry = schema_registry_client
226226
self._rule_registry = (
227227
rule_registry if rule_registry else RuleRegistry.get_global_instance()
@@ -394,7 +394,7 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti
394394
return result
395395

396396
ref_registry = _resolve_named_schema(schema, self._registry)
397-
parsed_schema = json.loads(schema.schema_str)
397+
parsed_schema = orjson.loads(schema.schema_str)
398398

399399
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
400400
return parsed_schema, ref_registry
@@ -510,7 +510,7 @@ def __init_impl(
510510
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
511511
self._parsed_schemas = ParsedSchemaCache()
512512
self._validators = LRUCache(1000)
513-
self._json_decode = json_decode or json.loads
513+
self._json_decode = json_decode or orjson.loads
514514
self._use_schema_id = None
515515

516516
conf_copy = self._default_conf.copy()
@@ -659,7 +659,7 @@ def _get_parsed_schema(self, schema: Schema) -> Tuple[Optional[JsonSchema], Opti
659659
return result
660660

661661
ref_registry = _resolve_named_schema(schema, self._registry)
662-
parsed_schema = json.loads(schema.schema_str)
662+
parsed_schema = orjson.loads(schema.schema_str)
663663

664664
self._parsed_schemas.set(schema, (parsed_schema, ref_registry))
665665
return parsed_schema, ref_registry

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
#
1818

19-
import json
19+
import orjson
2020
import logging
2121
import time
2222
import urllib
@@ -416,7 +416,7 @@ def send_request(
416416
" application/json"}
417417

418418
if body is not None:
419-
body = json.dumps(body)
419+
body = orjson.dumps(body).decode('utf-8')
420420
headers = {'Content-Length': str(len(body)),
421421
'Content-Type': "application/vnd.schemaregistry.v1+json"}
422422

src/confluent_kafka/schema_registry/common/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections import defaultdict
44
from copy import deepcopy
55
from io import BytesIO
6-
from json import loads
6+
import orjson
77
from typing import Dict, Union, Optional, Set
88

99
from fastavro import repository, validate
@@ -91,7 +91,7 @@ def load(self, subject):
9191

9292
def parse_schema_with_repo(schema_str: str, named_schemas: Dict[str, AvroSchema]) -> AvroSchema:
9393
copy = deepcopy(named_schemas)
94-
copy["$root"] = loads(schema_str)
94+
copy["$root"] = orjson.loads(schema_str)
9595
repo = LocalSchemaRepository(copy)
9696
return load_schema("$root", repo=repo)
9797

0 commit comments

Comments
 (0)