Skip to content

Commit

Permalink
create an instance of each necessary validator when handling schema i…
Browse files Browse the repository at this point in the history
…nstead to run it in the loop (#19)
  • Loading branch information
arilton authored Oct 8, 2024
1 parent 77dffef commit 64d16b8
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from google.cloud.bigquery import WriteDisposition
from google.cloud.bigquery.job import SourceFormat
from google.cloud.exceptions import NotFound
from jsonschema import validate
from jsonschema.validators import validator_for

from target_bigquery.encoders import DecimalEncoder
from target_bigquery.schema import build_schema, cleanup_record, format_record_to_schema
Expand Down Expand Up @@ -45,6 +45,7 @@ def __init__(self, logger, **kwargs):

self.tables = {}
self.schemas = {}
self.validators = {}
self.key_properties = {}
self.bq_schemas = {}
self.bq_schema_dicts = {}
Expand Down Expand Up @@ -84,6 +85,9 @@ def handle_schema_message(self, msg):
self.table_prefix, msg.stream, self.table_suffix
)
self.schemas[msg.stream] = msg.schema
validator_cls = validator_for(msg.schema)
validator_cls.check_schema(msg.schema)
self.validators[msg.stream] = validator_cls(msg.schema)
self.key_properties[msg.stream] = msg.key_properties

validate_json_schema_completeness(self.schemas[msg.stream])
Expand Down Expand Up @@ -194,11 +198,12 @@ def handle_record_message(self, msg):
# schema validation may fail if data doesn't match schema in terms of data types
# in this case, we validate schema again on data which has been forced to match schema
# nr is based on msg.record, but all data from msg.record has been forced to match schema
validator = self.validators[stream]
if self.validate_records:
try:
validate(msg.record, schema)
validator.validate(msg.record, schema)
except Exception as e:
validate(nr, schema)
validator.validate(nr, schema)

if self.add_metadata_columns:
nr["_time_extracted"] = msg.time_extracted.isoformat() \
Expand Down

0 comments on commit 64d16b8

Please sign in to comment.