@@ -21,6 +21,7 @@ use crate::connectors::kafka::config::BufferConfig;
21
21
use crate :: connectors:: kafka:: { ConsumerRecord , StreamConsumer , TopicPartition } ;
22
22
use crate :: event:: format;
23
23
use crate :: event:: format:: EventFormat ;
24
+ use crate :: event:: Event as ParseableEvent ;
24
25
use crate :: handlers:: http:: ingest:: create_stream_if_not_exists;
25
26
use crate :: metadata:: { SchemaVersion , STREAM_INFO } ;
26
27
use crate :: storage:: StreamType ;
@@ -41,7 +42,7 @@ impl ParseableSinkProcessor {
41
42
async fn deserialize (
42
43
& self ,
43
44
consumer_record : & ConsumerRecord ,
44
- ) -> anyhow:: Result < Option < crate :: event :: Event > > {
45
+ ) -> anyhow:: Result < Option < ParseableEvent > > {
45
46
let stream_name = consumer_record. topic . as_str ( ) ;
46
47
47
48
create_stream_if_not_exists ( stream_name, & StreamType :: UserDefined . to_string ( ) ) . await ?;
@@ -69,7 +70,7 @@ impl ParseableSinkProcessor {
69
70
let ( record_batch, is_first) =
70
71
event. into_recordbatch ( & schema, None , None , SchemaVersion :: V1 ) ?;
71
72
72
- let p_event = crate :: event :: Event {
73
+ let p_event = ParseableEvent {
73
74
rb : record_batch,
74
75
stream_name : stream_name. to_string ( ) ,
75
76
origin_format : "json" ,
0 commit comments