Skip to content

Commit 6a45270

Browse files
committed
fixup! fixup! fixup! fixup! fixup! fixup! fixup! Entity-cache preparation step for mea transformers
1 parent 86f7c07 commit 6a45270

2 files changed

Lines changed: 11 additions & 14 deletions

File tree

crates/extensions/c8y_mapper_ext/src/flows.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ impl C8yMapperBuilder {
2121
};
2222
let mut flows = ConnectedFlowRegistry::new(flows_dir);
2323

24-
flows.register_builtin(crate::mea::message_cache::MessageCache::default());
24+
let mapper_topic_id = self.config.service_topic_id.clone();
25+
flows.register_builtin(crate::mea::message_cache::MessageCache::new(
26+
mapper_topic_id,
27+
));
2528
flows.register_builtin(crate::mea::measurements::MeasurementConverter::default());
2629
flows.register_builtin(crate::mea::events::EventConverter::default());
2730
flows.register_builtin(crate::mea::alarms::AlarmConverter::default());
@@ -109,7 +112,7 @@ topic = "{errors_topic}"
109112
110113
steps = [
111114
{{ builtin = "add-timestamp", config = {{ property = "time", format = "unix", reformat = false }} }},
112-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}", mapper_topic_id = "{mapper_topic_id}" }} }},
115+
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
113116
{{ builtin = "into-c8y-measurements", config = {{ topic_root = "{topic_prefix}" }} }},
114117
{{ builtin = "limit-payload-size", config = {{ max_size = {max_size} }} }},
115118
]
@@ -140,7 +143,7 @@ topic = "{errors_topic}"
140143
141144
steps = [
142145
{{ builtin = "add-timestamp", config = {{ property = "time", format = "rfc3339", reformat = false }} }},
143-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}", mapper_topic_id = "{mapper_topic_id}" }} }},
146+
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
144147
{{ builtin = "into-c8y-events", config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}", max_mqtt_payload_size = {max_mqtt_payload_size} }} }},
145148
]
146149
@@ -171,7 +174,7 @@ topic = "{errors_topic}"
171174
172175
steps = [
173176
{{ builtin = "add-timestamp", config = {{ property = "time", format = "rfc3339", reformat = false }} }},
174-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}", mapper_topic_id = "{mapper_topic_id}" }} }},
177+
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
175178
{{ builtin = "into-c8y-alarms", interval = "3s", config = {{ topic_root = "{topic_prefix}", c8y_prefix = "{c8y_prefix}" }} }},
176179
{{ builtin = "limit-payload-size", config = {{ max_size = {max_size} }} }},
177180
]
@@ -201,7 +204,7 @@ topic = "{errors_topic}"
201204
r#"input.mqtt.topics = {input_topics:?}
202205
203206
steps = [
204-
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}", mapper_topic_id = "{mapper_topic_id}" }} }},
207+
{{ builtin = "cache-early-messages", config = {{ topic_root = "{topic_prefix}" }} }},
205208
{{ builtin = "into-c8y-health-status", config = {{ topic_root = "{topic_prefix}", main_device = "{main_device}", c8y_prefix = "{c8y_prefix}" }} }},
206209
]
207210

crates/extensions/c8y_mapper_ext/src/mea/message_cache.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::mea::entities::C8yEntityBirth;
22
use crate::mea::get_entity_metadata;
33
use std::collections::HashMap;
4-
use std::str::FromStr;
54
use std::time::SystemTime;
65
use tedge_api::mqtt_topics::Channel;
76
use tedge_api::mqtt_topics::EntityTopicId;
@@ -24,11 +23,11 @@ pub struct MessageCache {
2423
cache: HashMap<EntityTopicId, RingBuffer<Message>>,
2524
}
2625

27-
impl Default for MessageCache {
28-
fn default() -> Self {
26+
impl MessageCache {
27+
pub fn new(mapper_topic_id: EntityTopicId) -> Self {
2928
MessageCache {
3029
mqtt_schema: MqttSchema::default(),
31-
mapper_topic_id: EntityTopicId::default_main_service("tedge-mapper-c8y").unwrap(),
30+
mapper_topic_id,
3231
cache: HashMap::default(),
3332
}
3433
}
@@ -43,11 +42,6 @@ impl tedge_flows::Transformer for MessageCache {
4342
if let Some(root) = config.string_property("topic_root") {
4443
self.mqtt_schema = MqttSchema::with_root(root.to_string())
4544
}
46-
if let Some(mapper_topic_id) = config.string_property("mapper_topic_id") {
47-
self.mapper_topic_id = EntityTopicId::from_str(mapper_topic_id).map_err(|err| {
48-
ConfigError::IncorrectSetting(format!("Not a valid entity topic id: {}", err))
49-
})?;
50-
}
5145
Ok(())
5246
}
5347

0 commit comments

Comments
 (0)