-
Notifications
You must be signed in to change notification settings - Fork 94
Expand file tree
/
Copy pathprocessor.rs
More file actions
251 lines (230 loc) · 9.62 KB
/
processor.rs
File metadata and controls
251 lines (230 loc) · 9.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//! Set of traits and structures used to implement processors.
//!
//! A processor is a node in the pipeline that transforms, filters, or otherwise processes messages
//! as they flow through the pipeline. Processors can perform operations such as:
//!
//! 1. Filtering messages based on certain criteria
//! 2. Transforming message content or format
//! 3. Aggregating multiple messages into a single message
//! 4. Splitting a single message into multiple messages
//! 5. Adding or removing attributes from messages
//!
//! # Lifecycle
//!
//! 1. The processor is instantiated and configured
//! 2. The processor receives and processes both data messages and control messages
//! 3. For each message, the processor can transform it, filter it, or split it into multiple messages
//! 4. The processor can maintain state between processing calls if needed
//! 5. The processor responds to control messages such as Config, TimerTick, or Shutdown
//! 6. The processor shuts down when it receives a `Shutdown` control message or encounters a fatal error
//!
//! # Thread Safety
//!
//! This implementation is designed for use in both single-threaded and multi-threaded environments.
//! The `Processor` trait requires the `Send` bound, enabling the use of thread-safe types.
//!
//! # Scalability
//!
//! To ensure scalability, the pipeline engine will start multiple instances of the same pipeline
//! in parallel on different cores, each with its own processor instance.
use crate::control::{AckMsg, NackMsg};
use crate::effect_handler::{EffectHandlerCore, TelemetryTimerCancelHandle, TimerCancelHandle};
use crate::error::{Error, ProcessorErrorKind, TypedError};
use crate::message::Message;
use crate::node::NodeId;
use crate::shared::message::SharedSender;
use async_trait::async_trait;
use otap_df_config::PortName;
use otap_df_telemetry::error::Error as TelemetryError;
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::reporter::MetricsReporter;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// A trait for processors in the pipeline (Send definition).
#[async_trait]
pub trait Processor<PData> {
/// Processes a message and optionally produces effects, such as generating new pdata messages.
///
/// This method is called by the pipeline engine for each message that arrives at the processor.
/// Unlike receivers, processors have known inputs (messages from previous stages), so the pipeline
/// engine can control when to call this method and when the processor executes.
///
/// This approach allows for greater flexibility and optimization, giving the pipeline engine
/// the ability to decide whether to spawn one task per processor or one task for a group of processors.
/// The method signature uses `&mut self` rather than `Box<Self>` because the engine only wants to
/// temporarily allow mutation of the processor instance, not transfer ownership.
///
/// The processor can:
/// - Transform the message and return a new message
/// - Filter the message by returning None
/// - Split the message into multiple messages by returning a vector
/// - Handle control messages (e.g., Config, TimerTick, Shutdown)
///
/// # Parameters
///
/// - `msg`: The message to process, which can be either a data message or a control message
/// - `effect_handler`: A handler to perform side effects such as sending messages to the next node.
/// This can be either Send or !Send depending on the processor's Mode type.
///
/// # Returns
///
/// - `Ok(())`: The processor successfully processed the message
/// - `Err(Error)`: The processor encountered an error and could not process the message
///
/// # Errors
///
/// Returns an [`Error`] if the processor encounters an unrecoverable error.
async fn process(
&mut self,
msg: Message<PData>,
effect_handler: &mut EffectHandler<PData>,
) -> Result<(), Error>;
}
/// A `Send` implementation of the EffectHandler.
#[derive(Clone)]
pub struct EffectHandler<PData> {
pub(crate) core: EffectHandlerCore<PData>,
senders: Arc<EffectHandlerSenders<PData>>,
}
struct EffectHandlerSenders<PData> {
/// A sender used to forward messages from the processor.
/// Supports multiple named output ports.
msg_senders: HashMap<PortName, SharedSender<PData>>,
/// Cached default sender for fast access in the hot path
default_sender: Option<SharedSender<PData>>,
}
/// Implementation for the `Send` effect handler.
impl<PData> EffectHandler<PData> {
/// Creates a new shared (Send) `EffectHandler` with the given processor name and pdata sender.
#[must_use]
pub fn new(
node_id: NodeId,
msg_senders: HashMap<PortName, SharedSender<PData>>,
default_port: Option<PortName>,
metrics_reporter: MetricsReporter,
) -> Self {
let core = EffectHandlerCore::new(node_id, metrics_reporter);
// Determine and cache the default sender
let default_sender = if let Some(ref port) = default_port {
msg_senders.get(port).cloned()
} else if msg_senders.len() == 1 {
msg_senders.values().next().cloned()
} else {
None
};
let senders = Arc::new(EffectHandlerSenders {
msg_senders,
default_sender,
});
EffectHandler { core, senders }
}
/// Returns the id of the processor associated with this handler.
#[must_use]
pub fn processor_id(&self) -> NodeId {
self.core.node_id()
}
/// Returns the list of connected out ports for this processor.
#[must_use]
pub fn connected_ports(&self) -> Vec<PortName> {
self.senders.msg_senders.keys().cloned().collect()
}
/// Sends a message to the next node(s) in the pipeline.
///
/// # Errors
///
/// Returns an [`Error::ProcessorError`] if the message could not be routed to a port.
#[inline]
pub async fn send_message(&self, data: PData) -> Result<(), TypedError<PData>> {
match &self.senders.default_sender {
Some(sender) => sender
.send(data)
.await
.map_err(TypedError::ChannelSendError),
None => Err(TypedError::Error(Error::ProcessorError {
processor: self.processor_id(),
kind: ProcessorErrorKind::Configuration,
error:
"Ambiguous default out port: multiple ports connected and no default configured"
.to_string(),
source_detail: String::new(),
})),
}
}
/// Sends a message to a specific named out port.
#[inline]
pub async fn send_message_to<P>(&self, port: P, data: PData) -> Result<(), TypedError<PData>>
where
P: Into<PortName>,
{
let port_name: PortName = port.into();
match self.senders.msg_senders.get(&port_name) {
Some(sender) => sender
.send(data)
.await
.map_err(TypedError::ChannelSendError),
None => Err(TypedError::Error(Error::ProcessorError {
processor: self.processor_id(),
kind: ProcessorErrorKind::Configuration,
error: format!(
"Unknown out port '{port_name}' for node {}",
self.processor_id()
),
source_detail: String::new(),
})),
}
}
/// Print an info message to stdout.
///
/// This method provides a standardized way for processors to output
/// informational messages without blocking the async runtime.
pub async fn info(&self, message: &str) {
self.core.info(message).await;
}
/// Starts a cancellable periodic timer that emits TimerTick on the control channel.
/// Returns a handle that can be used to cancel the timer.
///
/// Current limitation: Only one timer can be started by a processor at a time.
pub async fn start_periodic_timer(
&self,
duration: Duration,
) -> Result<TimerCancelHandle<PData>, Error> {
self.core.start_periodic_timer(duration).await
}
/// Starts a cancellable periodic telemetry timer that emits CollectTelemetry.
pub async fn start_periodic_telemetry(
&self,
duration: Duration,
) -> Result<TelemetryTimerCancelHandle<PData>, Error> {
self.core.start_periodic_telemetry(duration).await
}
/// Send an Ack to a node of known-interest.
pub async fn route_ack<F>(&self, ack: AckMsg<PData>, cxf: F) -> Result<(), Error>
where
F: FnOnce(AckMsg<PData>) -> Option<(usize, AckMsg<PData>)>,
{
self.core.route_ack(ack, cxf).await
}
/// Send a Nack to a node of known-interest.
pub async fn route_nack<F>(&self, nack: NackMsg<PData>, cxf: F) -> Result<(), Error>
where
F: FnOnce(NackMsg<PData>) -> Option<(usize, NackMsg<PData>)>,
{
self.core.route_nack(nack, cxf).await
}
/// Delay data.
pub async fn delay_data(&self, when: Instant, data: Box<PData>) -> Result<(), PData> {
self.core.delay_data(when, data).await
}
/// Reports metrics collected by the processor.
#[allow(dead_code)] // Will be used in the future. ToDo report metrics from channel and messages.
pub(crate) fn report_metrics<M: MetricSetHandler + 'static>(
&mut self,
metrics: &mut MetricSet<M>,
) -> Result<(), TelemetryError> {
self.core.report_metrics(metrics)
}
// More methods will be added in the future as needed.
}