Skip to content

Commit bb12d04

Browse files
committed
fix ParseableSinkProcessor.deserialize
1 parent 4ecba10 commit bb12d04

File tree

2 files changed

+15
-11
lines changed

2 files changed

+15
-11
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/connectors/kafka/processor.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use crate::connectors::common::processor::Processor;
2020
use crate::connectors::kafka::config::BufferConfig;
2121
use crate::connectors::kafka::{ConsumerRecord, StreamConsumer, TopicPartition};
2222
use crate::event::format;
23-
use crate::event::format::EventFormat;
23+
use crate::event::format::{EventFormat, LogSource};
2424
use crate::event::Event as ParseableEvent;
2525
use crate::handlers::http::ingest::create_stream_if_not_exists;
26-
use crate::metadata::{SchemaVersion, STREAM_INFO};
26+
use crate::metadata::STREAM_INFO;
2727
use crate::storage::StreamType;
2828
use async_trait::async_trait;
2929
use chrono::Utc;
@@ -59,19 +59,22 @@ impl ParseableSinkProcessor {
5959
}
6060
Some(payload) => {
6161
let data: Value = serde_json::from_slice(payload.as_ref())?;
62+
let json_event = format::json::Event { data };
6263

63-
let event = format::json::Event {
64-
data,
65-
tags: String::default(),
66-
metadata: String::default(),
67-
};
64+
let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
65+
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;
66+
let schema_version = STREAM_INFO.get_schema_version(stream_name)?;
6867

69-
// TODO: Implement a buffer (e.g., a wrapper around [Box<dyn ArrayBuilder>]) to optimize the creation of ParseableEvent by compacting the internal RecordBatch.
70-
let (record_batch, is_first) =
71-
event.into_recordbatch(&schema, None, None, SchemaVersion::V1)?;
68+
let (rb, is_first) = json_event.into_recordbatch(
69+
&schema,
70+
static_schema_flag.as_ref(),
71+
time_partition.as_ref(),
72+
schema_version,
73+
&LogSource::Json,
74+
)?;
7275

7376
let p_event = ParseableEvent {
74-
rb: record_batch,
77+
rb,
7578
stream_name: stream_name.to_string(),
7679
origin_format: "json",
7780
origin_size: payload.len() as u64,

0 commit comments

Comments
 (0)