diff --git a/Cargo.lock b/Cargo.lock index bdf8fd05..a41e9030 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,41 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.79", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.79", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -488,6 +523,7 @@ dependencies = [ "tokio", "tokio-cron-scheduler", "tokio-util", + "validator", ] [[package]] @@ -576,6 +612,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.30" @@ -756,6 +801,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -1044,6 +1099,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -1565,6 +1626,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.40.0" @@ -1706,12 +1782,27 @@ dependencies = [ "unsafe-any-ors", ] +[[package]] +name = "unicode-bidi" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -1733,6 +1824,17 @@ dependencies = [ "destructure_traitobject", ] +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "utf8parse" version = "0.2.2" @@ -1748,6 +1850,36 @@ dependencies = [ "getrandom", ] +[[package]] +name = "validator" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db79c75af171630a3148bd3e6d7c4f42b6a9a014c2945bc5ed0020cbb8d9478e" +dependencies = [ + "idna", + "once_cell", + "regex", + "serde", + "serde_derive", + "serde_json", + "url", + "validator_derive", +] + +[[package]] +name = "validator_derive" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0bcf92720c40105ac4b2dda2a4ea3aa717d4d6a862cc217da653a4bd5c6b10" +dependencies = [ + "darling", + "once_cell", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/README.md b/README.md index 20dc030d..76163205 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,8 @@ After completing the flow design in Node-RED, please ensure that you click the b ### 1. Build +Using Rust 1.80 or later, run: + ```bash cargo build -r ``` @@ -99,7 +101,8 @@ py.test ## Configuration -Adjust various settings in the configuration file, such as port number, `flows.json` path, etc. Refer to [CONFIG.md](docs/CONFIG.md) for more information. +Adjust various settings and configuration, please execute `edgelinkd` with flags. +The flags available can be found when executing `edgelinkd --help`. ## Project Status diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2a5dfd0c..33410356 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -50,6 +50,8 @@ smallvec.workspace = true smallstr.workspace = true inventory.workspace = true arrayvec = { workspace = true, features = ["std", "serde"] } +validator = { version = "0.18.1", features = ["derive"] } + [dev-dependencies] # Enable test-utilities in dev mode only. This is mostly for tests. diff --git a/crates/core/src/runtime/engine.rs b/crates/core/src/runtime/engine.rs index 664c072b..e8ee0180 100644 --- a/crates/core/src/runtime/engine.rs +++ b/crates/core/src/runtime/engine.rs @@ -15,6 +15,7 @@ use super::nodes::FlowNodeBehavior; use crate::runtime::model::Variant; use crate::runtime::nodes::{GlobalNodeBehavior, NodeFactory}; use crate::*; +use crate::utils::constants::SUB_FLOW_TYPE; #[derive(Debug, Clone, Deserialize, Default)] pub struct EngineArgs { @@ -161,7 +162,12 @@ impl Engine { ) -> crate::Result<()> { // load flows for flow_config in flow_cfg.into_iter() { - log::debug!("---- Loading flow/subflow: (id='{}', label='{}')...", flow_config.id, flow_config.label); + if &flow_config.type_name==SUB_FLOW_TYPE { + log::debug!("---- Loading subflow: (id='{}', label='{}')...", flow_config.id, flow_config.label); + }else{ + log::debug!("---- Loading flow: (id='{}', label='{}')...", flow_config.id, flow_config.label); + } + let flow = Flow::new(self, flow_config, reg, elcfg)?; { // register all nodes @@ -171,7 +177,7 @@ impl Engine { "This flow node already existed: {}", fnode )) - .into()); + .into()); } self.inner.all_flow_nodes.insert(fnode.id(), fnode.clone()); } diff --git a/crates/core/src/runtime/flow.rs b/crates/core/src/runtime/flow.rs index 9223d960..fe835edd 100644 --- a/crates/core/src/runtime/flow.rs +++ b/crates/core/src/runtime/flow.rs @@ -19,7 +19,8 @@ use crate::runtime::model::json::*; use crate::runtime::model::*; use crate::runtime::nodes::*; use crate::runtime::registry::Registry; -use crate::EdgelinkError; +use crate::{EdgelinkError, handle_option}; +use crate::utils::constants::{ENV_STR, FLOW_STR, SUB_FLOW_TYPE, SUB_FLOW_TYPE_HEAD, TAB_STR}; const NODE_MSG_CHANNEL_CAPACITY: usize = 32; @@ -71,6 +72,17 @@ pub enum FlowKind { Subflow, } +impl FlowKind { + pub fn from(value: &str) -> Option { + if value == SUB_FLOW_TYPE { + return Some(FlowKind::Subflow); + }else if value == TAB_STR { + return Some(FlowKind::GlobalFlow); + } + None + } +} + #[derive(Debug)] struct InnerFlow { id: ElementId, @@ -185,11 +197,8 @@ impl Flow { reg: &RegistryHandle, options: Option<&config::Config>, ) -> crate::Result { - let flow_kind = match flow_config.type_name.as_str() { - "tab" => FlowKind::GlobalFlow, - "subflow" => FlowKind::Subflow, - _ => return Err(EdgelinkError::BadFlowsJson("Unsupported flow type".to_string()).into()), - }; + + let flow_kind = handle_option!(result: FlowKind::from(&flow_config.type_name),EdgelinkError::BadFlowsJson,str:"Unsupported flow type"); let subflow_instance = flow_config.subflow_node_id.and_then(|x| engine.find_flow_node_by_id(&x)); @@ -205,7 +214,7 @@ impl Flow { } } }; - if let Some(env_json) = flow_config.rest.get("env") { + if let Some(env_json) = flow_config.rest.get(ENV_STR) { envs_builder = envs_builder.load_json(env_json); } if let Some(ref instance) = subflow_instance { @@ -249,8 +258,8 @@ impl Flow { ordering: flow_config.ordering, _args: args.clone(), type_str: match flow_kind { - FlowKind::GlobalFlow => "flow", - FlowKind::Subflow => "subflow", + FlowKind::GlobalFlow => FLOW_STR, + FlowKind::Subflow => SUB_FLOW_TYPE, }, groups: DashMap::new(), nodes: DashMap::new(), @@ -309,8 +318,8 @@ impl Flow { for node_config in flow_config.nodes.iter() { let meta_node = if let Some(meta_node) = reg.get(&node_config.type_name) { meta_node - } else if node_config.type_name.starts_with("subflow:") { - reg.get("subflow").expect("The `subflow` node must be existed") + } else if node_config.type_name.starts_with(SUB_FLOW_TYPE_HEAD) { + reg.get(SUB_FLOW_TYPE).expect("The `subflow` node must be existed") } else { log::warn!( "Unknown flow node type: (type='{}', id='{}', name='{}')", @@ -637,7 +646,7 @@ impl Flow { } else { envs_builder = envs_builder.with_parent(self.get_envs()); } - if let Some(env_json) = node_config.rest.get("env") { + if let Some(env_json) = node_config.rest.get(ENV_STR) { envs_builder = envs_builder.load_json(env_json); } let envs = envs_builder diff --git a/crates/core/src/runtime/group.rs b/crates/core/src/runtime/group.rs index 676667e0..cb20a9d0 100644 --- a/crates/core/src/runtime/group.rs +++ b/crates/core/src/runtime/group.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::Weak; +use crate::utils::constants::ENV_STR; use super::env::*; use super::flow::*; @@ -119,7 +120,7 @@ impl Group { } fn build_envs(mut envs_builder: EnvStoreBuilder, config: &RedGroupConfig) -> Envs { - if let Some(env_json) = config.rest.get("env") { + if let Some(env_json) = config.rest.get(ENV_STR) { envs_builder = envs_builder.load_json(env_json); } envs_builder diff --git a/crates/core/src/runtime/model/json/deser.rs b/crates/core/src/runtime/model/json/deser.rs index 5f553bb6..63e1a500 100644 --- a/crates/core/src/runtime/model/json/deser.rs +++ b/crates/core/src/runtime/model/json/deser.rs @@ -11,7 +11,8 @@ use serde_json::Value as JsonValue; use crate::runtime::model::ElementId; use crate::text::json::{option_value_equals_str, EMPTY_ARRAY}; -use crate::EdgelinkError; +use crate::{EdgelinkError, get_json_value}; +use crate::utils::constants::{ENV_STR, FLOW_STR, ID_STR, NAME_STR, SUB_FLOW_TYPE, TAB_STR, TYPE_STR}; use super::*; @@ -34,18 +35,18 @@ pub fn load_flows_json_value(root_jv: JsonValue) -> crate::Result for jobject in all_values.iter() { if let Some(obj) = jobject.as_object() { if let (Some(ele_id), Some(type_value)) = ( - obj.get("id").and_then(parse_red_id_value), - obj.get("type").and_then(|x| x.as_str()).map(|x| parse_red_type_value(x)), + obj.get(ID_STR).and_then(parse_red_id_value), + obj.get(TYPE_STR).and_then(|x| x.as_str()).map(|x| parse_red_type_value(x)), ) { match type_value.red_type { - "tab" => { + value if TAB_STR ==value => { let deps = obj.get_flow_dependencies(all_values); flow_topo_sort.add_vertex(ele_id); flow_topo_sort.add_deps(ele_id, deps); flows.insert(ele_id, jobject.clone()); } - "subflow" => { + value if value== SUB_FLOW_TYPE => { if type_value.id.is_some() { // "subflow:aabbccddee" We got a node that links to the subflow let deps = obj.get_flow_node_dependencies(); @@ -122,8 +123,8 @@ pub fn load_flows_json_value(root_jv: JsonValue) -> crate::Result log::debug!( "SORTED_NODES: node.id='{}', node.name='{}', node.type='{}'", node_id, - node.get("name").and_then(|x| x.as_str()).unwrap_or(""), - node.get("type").and_then(|x| x.as_str()).unwrap_or("") + node.get(NAME_STR).and_then(|x| x.as_str()).unwrap_or(""), + node.get(TYPE_STR).and_then(|x| x.as_str()).unwrap_or("") ); sorted_flow_nodes.push(node); } else { @@ -136,11 +137,11 @@ pub fn load_flows_json_value(root_jv: JsonValue) -> crate::Result let mut flow_config: RedFlowConfig = serde_json::from_value(flow)?; flow_config.ordering = flow_ordering; - flow_config.subflow_node_id = if flow_config.type_name == "subflow" { - let key_type = format!("subflow:{}", flow_config.id); + flow_config.subflow_node_id = if flow_config.type_name == SUB_FLOW_TYPE { + let key_type = format!("{SUB_FLOW_TYPE}:{}", flow_config.id); let node = - all_values.iter().find(|x| x.get("type").and_then(|y| y.as_str()).is_some_and(|y| y == key_type)); - node.and_then(|x| x.get("id")).and_then(parse_red_id_value) + all_values.iter().find(|x| x.get(TYPE_STR).and_then(|y| y.as_str()).is_some_and(|y| y == key_type)); + node.and_then(|x| x.get(ID_STR)).and_then(parse_red_id_value) } else { None }; @@ -179,15 +180,15 @@ fn preprocess_subflows(jv_root: JsonValue) -> crate::Result { // Find out all of subflow related elements for jv in elements.iter() { - if let Some(("subflow", subflow_id)) = jv.get("type").and_then(|x| x.as_str()).and_then(|x| x.split_once(':')) { + if let Some(("subflow", subflow_id)) = jv.get(TYPE_STR).and_then(|x| x.as_str()).and_then(|x| x.split_once(':')) { let subflow = elements .iter() - .find(|x| x.get("id").and_then(|y| y.as_str()).is_some_and(|y| y == subflow_id)) + .find(|x| x.get(ID_STR).and_then(|y| y.as_str()).is_some_and(|y| y == subflow_id)) .ok_or(EdgelinkError::BadFlowsJson(format!( "The cannot found the subflow for subflow instance node(id='{}', type='{}', name='{}')", subflow_id, - jv.get("type").and_then(|x| x.as_str()).unwrap_or(""), - jv.get("name").and_then(|x| x.as_str()).unwrap_or("") + get_json_value!(jv,TYPE_STR,str), + get_json_value!(jv,NAME_STR,str), )))?; // All elements belongs to this flow @@ -215,23 +216,23 @@ fn preprocess_subflows(jv_root: JsonValue) -> crate::Result { // "subflow" element { let mut new_subflow = pack.subflow.clone(); - new_subflow["id"] = JsonValue::String(subflow_new_id.to_string()); - id_map.insert(pack.subflow_id.to_string(), new_subflow["id"].as_str().unwrap().to_string()); + new_subflow[ID_STR] = JsonValue::String(subflow_new_id.to_string()); + id_map.insert(pack.subflow_id.to_string(),subflow_new_id.to_string()); new_elements.push(new_subflow); } // the fixed subflow instance node { let mut new_instance = pack.instance.clone(); - new_instance["type"] = JsonValue::String(format!("subflow:{}", subflow_new_id)); + new_instance[TYPE_STR] = JsonValue::String(format!("subflow:{}", subflow_new_id)); new_elements.push(new_instance); } // The children elements in the subflow for old_child in pack.children.iter() { let mut new_child = (*old_child).clone(); - new_child["id"] = generate_new_xored_id_value(subflow_new_id, old_child["id"].as_str().unwrap())?; - id_map.insert(old_child["id"].as_str().unwrap().to_string(), new_child["id"].as_str().unwrap().to_string()); + new_child[ID_STR] = generate_new_xored_id_value(subflow_new_id, old_child[ID_STR].as_str().unwrap())?; + id_map.insert(old_child[ID_STR].as_str().unwrap().to_string(), new_child[ID_STR].as_str().unwrap().to_string()); new_elements.push(new_child); } } @@ -253,10 +254,10 @@ fn preprocess_subflows(jv_root: JsonValue) -> crate::Result { } // Replace the nested flow instance `type` property - if let Some(JsonValue::String(pvalue)) = node.get_mut("type") { + if let Some(JsonValue::String(pvalue)) = node.get_mut(TYPE_STR) { if let Some(("subflow", old_id)) = pvalue.split_once(':') { if let Some(new_id) = id_map.get(old_id) { - *pvalue = format!("subflow:{}", new_id); + *pvalue = format!("{SUB_FLOW_TYPE}:{}", new_id); } } } @@ -302,7 +303,7 @@ fn preprocess_subflows(jv_root: JsonValue) -> crate::Result { if let Some(JsonValue::Array(in_props)) = node.get_mut("in") { for in_item in in_props.iter_mut() { for wires_item in in_item["wires"].as_array_mut().unwrap().iter_mut() { - if let Some(JsonValue::String(pvalue)) = wires_item.get_mut("id") { + if let Some(JsonValue::String(pvalue)) = wires_item.get_mut(ID_STR) { if let Some(new_id) = id_map.get(pvalue.as_str()) { *pvalue = new_id.to_string(); } @@ -315,7 +316,7 @@ fn preprocess_subflows(jv_root: JsonValue) -> crate::Result { if let Some(JsonValue::Array(out_props)) = node.get_mut("out") { for out_item in out_props.iter_mut() { for wires_item in out_item["wires"].as_array_mut().unwrap().iter_mut() { - if let Some(JsonValue::String(pvalue)) = wires_item.get_mut("id") { + if let Some(JsonValue::String(pvalue)) = wires_item.get_mut(ID_STR) { if let Some(new_id) = id_map.get(pvalue.as_str()) { *pvalue = new_id.to_string(); } @@ -358,7 +359,7 @@ pub trait RedFlowJsonObject { impl RedFlowJsonObject for JsonMap { fn get_flow_dependencies(&self, elements: &[JsonValue]) -> HashSet { - let this_id = self.get("id"); + let this_id = self.get(ID_STR); let child_nodes = elements.iter().filter(|x| x.get("z") == this_id); // `wires`` connects to other flow nodes, and in the Node-RED GUI editor, @@ -369,8 +370,8 @@ impl RedFlowJsonObject for JsonMap { .iter() .filter_map(|x| { if x.get("z") == this_id - && (option_value_equals_str(&x.get("type"), "link out") - || option_value_equals_str(&x.get("type"), "link call")) + && (option_value_equals_str(&x.get(TYPE_STR), "link out") + || option_value_equals_str(&x.get(TYPE_STR), "link call")) { x.get("links").and_then(|y| y.as_array()) } else { @@ -383,9 +384,9 @@ impl RedFlowJsonObject for JsonMap { elements .iter() .filter(|x| { - if let Some(flow_id) = x.get("id") { + if let Some(flow_id) = x.get(ID_STR) { wires_ids.contains(flow_id) - || (option_value_equals_str(&x.get("type"), "link in") && related_link_in_ids.contains(flow_id)) + || (option_value_equals_str(&x.get(TYPE_STR), "link in") && related_link_in_ids.contains(flow_id)) } else { false } @@ -397,16 +398,16 @@ impl RedFlowJsonObject for JsonMap { } fn get_subflow_dependencies(&self, elements: &[JsonValue]) -> HashSet { - let subflow_id = self.get("id").and_then(|x| x.as_str()).expect("Must have `id`"); + let subflow_id = self.get(ID_STR).and_then(|x| x.as_str()).expect("Must have `id`"); elements .iter() .filter_map(|x| x.as_object()) .filter(|o| { - o.get("type") + o.get(TYPE_STR) .and_then(|x| x.as_str()) .and_then(|x| x.split_once(':')) - .is_some_and(|x| x.0 == "subflow" && x.1 == subflow_id) + .is_some_and(|x| x.0 == SUB_FLOW_TYPE && x.1 == subflow_id) }) .filter_map(|o| o.get("z")) .filter_map(parse_red_id_value) @@ -445,7 +446,7 @@ impl RedFlowNodeJsonObject for JsonMap { // Add links if let Some(links) = self.get("links").and_then(|x| x.as_array()) { - let red_type = self.get("type").and_then(|x| x.as_str()); + let red_type = self.get(TYPE_STR).and_then(|x| x.as_str()); if red_type == Some("link out") || red_type == Some("link call") { let iter = links.iter().filter_map(parse_red_id_value); result.extend(iter); @@ -542,7 +543,7 @@ impl RedPropertyType { "date" => Ok(RedPropertyType::Date), "bin" => Ok(RedPropertyType::Bin), "msg" => Ok(RedPropertyType::Msg), - "flow" => Ok(RedPropertyType::Flow), + value if value== FLOW_STR => Ok(RedPropertyType::Flow), "global" => Ok(RedPropertyType::Global), "bool" => Ok(RedPropertyType::Bool), "jsonata" => Ok(RedPropertyType::Jsonata), @@ -733,21 +734,21 @@ fn preprocess_merge_subflow_env(flows: &mut JsonValue) -> crate::Result<()> { let elements = flows.as_array_mut().ok_or(EdgelinkError::BadArgument("flows"))?; let subflows: HashMap = elements .iter() - .filter(|x| x.get("type").and_then(|y| y.as_str()).map(|y| y == "subflow").unwrap_or(false)) - .filter(|x| x.get("env").is_some()) - .map(|e| (e.get("id").and_then(|x| x.as_str()).unwrap().to_string(), e.get("env").cloned().unwrap())) + .filter(|x| x.get(TYPE_STR).and_then(|y| y.as_str()).map(|y| y == SUB_FLOW_TYPE).unwrap_or(false)) + .filter(|x| x.get(ENV_STR).is_some()) + .map(|e| (e.get(ID_STR).and_then(|x| x.as_str()).unwrap().to_string(), e.get(ENV_STR).cloned().unwrap())) .collect(); for element in elements.iter_mut() { if let Some(("subflow", subflow_id)) = - element.get("type").and_then(|x| x.as_str()).and_then(|x| x.split_once(':')) + element.get(TYPE_STR).and_then(|x| x.as_str()).and_then(|x| x.split_once(':')) { if let Some(subflow_env) = subflows.get(subflow_id) { - let instance_env = if let Some(instance_env) = element.get_mut("env") { + let instance_env = if let Some(instance_env) = element.get_mut(ENV_STR) { instance_env } else { - element["env"] = JsonValue::Array(Vec::new()); - element.get_mut("env").unwrap() + element[ENV_STR] = JsonValue::Array(Vec::new()); + element.get_mut(ENV_STR).unwrap() }; merge_env(instance_env, subflow_env)?; } @@ -763,13 +764,13 @@ fn merge_env(target_envs: &mut JsonValue, ref_envs: &JsonValue) -> crate::Result let target_names: HashSet = target_vec .iter() - .filter_map(|item| item.get("name")) + .filter_map(|item| item.get(NAME_STR)) .filter_map(|name| name.as_str()) .map(|name| name.to_string()) .collect(); for item in ref_vec.iter() { - if let Some(name) = item.get("name").and_then(|name| name.as_str()) { + if let Some(name) = item.get(NAME_STR).and_then(|name| name.as_str()) { if !target_names.contains(name) { target_vec.push(item.clone()); } diff --git a/crates/core/src/runtime/model/json/mod.rs b/crates/core/src/runtime/model/json/mod.rs index 747f8dd3..211a0f41 100644 --- a/crates/core/src/runtime/model/json/mod.rs +++ b/crates/core/src/runtime/model/json/mod.rs @@ -5,6 +5,7 @@ use serde_json::Value as JsonValue; pub mod deser; pub mod helpers; +mod npdeser; #[derive(serde::Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct RedPortConfig { diff --git a/crates/core/src/runtime/model/json/npdeser.rs b/crates/core/src/runtime/model/json/npdeser.rs new file mode 100644 index 00000000..5c5b5fae --- /dev/null +++ b/crates/core/src/runtime/model/json/npdeser.rs @@ -0,0 +1,140 @@ +//! [npdeser] this mod use to de deserializer node logic properties transport to real logic. +//! +//! # example +//! > this appendNewline config is belong to node red core node [file], Used to determine whether +//! > to wrap a file ,it's could It should be a boolean type, but the code logic allows it to be +//! > any non undefined, true false 0 and 1, and any character ,and any str. so need this mod handle +//! > this scene +//! ```js +//! this.appendNewline = n.appendNewline; +//! +//! if ((node.appendNewline) && (!Buffer.isBuffer(data)) && aflg) { data += os.EOL; } +//! ``` +//! +use std::borrow::Cow; +use std::str; +use serde::{de, Deserializer}; +use serde::de::{Error, Unexpected}; + +pub fn deser_bool_in_if_condition<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, +{ + struct BoolVisitor; + + impl<'de> de::Visitor<'de> for BoolVisitor { + type Value = bool; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a bool, convert failed") + } + fn visit_bytes(self, v: &[u8]) -> Result where E: Error { + match str::from_utf8(v) { + Ok(s) => { + if s =="" { + return Ok(false); + }else if s=="0" { + return Ok(false); + }else if s.contains("false")||s.contains("False")||s.contains("FALSE"){ + return Ok(false); + } + Ok(true) + }, + Err(_) => Err(Error::invalid_value(Unexpected::Bool(false), &self)), + } + } + fn visit_u64(self, v: u64) -> Result where E: Error { + println!("xxxxxxxxx"); + if v==0 { + return Ok(false); + } + Ok(true) + } + + + fn visit_f64(self, v: f64) -> Result where E: Error { + if v==0.0 { + return Ok(false); + } + Ok(true) + } + + fn visit_f32(self, v: f32) -> Result where E: Error { + if v==0.0 { + return Ok(false); + } + Ok(true) + } + + fn visit_str(self, v: &str) -> Result where E: Error { + if v =="" { + return Ok(false); + }else if v=="0" { + return Ok(false); + }else if v.contains("false")||v.contains("False")||v.contains("FALSE"){ + return Ok(false); + } + Ok(true) + } + } + + deserializer.deserialize_any(BoolVisitor) +} + + +#[cfg(test)] +mod tests { + use std::net::IpAddr; + use serde::Deserialize; + use serde_json::json; + use super::*; + + #[derive(Deserialize, Debug)] + struct TestNodeConfig { + #[serde(deserialize_with = "crate::runtime::model::json::npdeser::deser_bool_in_if_condition")] + test: bool + } + + #[test] + fn test_deser_bool_in_if_condition() { + let value_str = json!({"test":"xxx"}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(result.test); + + let value_str = json!({"test":"true"}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(result.test); + + + let value_str = json!({"test":"false"}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(!result.test); + + + let value_str = json!({"test":"False"}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(!result.test); + + let value_str = json!({"test":"0"}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(!result.test); + + let value_str = json!({"test":1.0}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(result.test); + + let value_str = json!({"test":0.0}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(!result.test); + + + let value_str = json!({"test":0}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(!result.test); + + + let value_str = json!({"test":1}); + let result = TestNodeConfig::deserialize(value_str).unwrap(); + assert!(result.test); + + } +} \ No newline at end of file diff --git a/crates/core/src/runtime/model/msg.rs b/crates/core/src/runtime/model/msg.rs index a0dd82ac..a7a47e68 100644 --- a/crates/core/src/runtime/model/msg.rs +++ b/crates/core/src/runtime/model/msg.rs @@ -398,6 +398,7 @@ mod tests { use super::*; use serde::Deserialize; use serde_json::json; + use crate::utils::constants::NAME_STR; #[test] fn test_get_nested_nav_property() { @@ -432,8 +433,8 @@ mod tests { assert!(old_foo.is_object()); assert_eq!(old_foo.as_object().unwrap()["bar"].as_str().unwrap(), "foo"); } - msg.set("name".into(), "world".into()); - assert_eq!(msg.get("name").unwrap().as_str().unwrap(), "world"); + msg.set(NAME_STR.into(), "world".into()); + assert_eq!(msg.get(NAME_STR).unwrap().as_str().unwrap(), "world"); msg.set_nav("foo.bar", "changed2".into(), false).unwrap(); assert_eq!(msg.get("foo").unwrap().as_object().unwrap().get("bar").unwrap().as_str().unwrap(), "changed2"); diff --git a/crates/core/src/runtime/nodes/function_nodes/change.rs b/crates/core/src/runtime/nodes/function_nodes/change.rs index 073ad585..32f0a97a 100644 --- a/crates/core/src/runtime/nodes/function_nodes/change.rs +++ b/crates/core/src/runtime/nodes/function_nodes/change.rs @@ -9,6 +9,7 @@ use crate::runtime::flow::Flow; use crate::runtime::model::*; use crate::runtime::nodes::*; use edgelink_macro::*; +use crate::utils::constants::FLOW_STR; #[derive(Debug)] #[flow_node("change")] @@ -492,7 +493,7 @@ fn handle_legacy_json(n: Value) -> crate::Result { } if let (Some(t), Some(fromt), Some(from)) = (rule.get("t"), rule.get("fromt"), rule.get("from")) { - if t == "change" && fromt != "msg" && fromt != "flow" && fromt != "global" { + if t == "change" && fromt != "msg" && fromt != FLOW_STR && fromt != "global" { let from_str = from.as_str().unwrap_or(""); let mut from_re = from_str.to_string(); diff --git a/crates/core/src/runtime/nodes/function_nodes/function/mod.rs b/crates/core/src/runtime/nodes/function_nodes/function/mod.rs index 9d7f7006..664b57ea 100644 --- a/crates/core/src/runtime/nodes/function_nodes/function/mod.rs +++ b/crates/core/src/runtime/nodes/function_nodes/function/mod.rs @@ -16,6 +16,7 @@ use crate::runtime::flow::Flow; use crate::runtime::model::*; use crate::runtime::nodes::*; use edgelink_macro::*; +use crate::utils::constants::ENV_STR; mod context_class; mod edgelink_class; @@ -341,7 +342,7 @@ impl FunctionNode { */ ::rquickjs_extra::timers::init(ctx)?; - ctx.globals().set("env", env_class::EnvClass::new(self.envs()))?; + ctx.globals().set(ENV_STR, env_class::EnvClass::new(self.envs()))?; ctx.globals().set("node", node_class::NodeClass::new(self))?; // Register the global-scoped context diff --git a/crates/core/src/runtime/nodes/mod.rs b/crates/core/src/runtime/nodes/mod.rs index 2126c78d..f0651ee6 100644 --- a/crates/core/src/runtime/nodes/mod.rs +++ b/crates/core/src/runtime/nodes/mod.rs @@ -15,10 +15,13 @@ use crate::runtime::model::json::{RedFlowNodeConfig, RedGlobalNodeConfig}; use crate::runtime::model::*; use crate::EdgelinkError; use crate::*; +use crate::utils::constants::FLOW_STR; pub(crate) mod common_nodes; mod function_nodes; +mod storage_nodes; + #[cfg(feature = "net")] mod network_nodes; @@ -258,7 +261,7 @@ where match node.recv_msg(cancel.clone()).await { Ok(msg) => { if let Err(ref err) = proc(node, msg.clone()).await { - let flow = node.flow().expect("flow"); + let flow = node.flow().expect(FLOW_STR); let error_message = err.to_string(); match flow.handle_error(node, &error_message, Some(msg.clone()), None, cancel.clone()).await { diff --git a/crates/core/src/runtime/nodes/storage_nodes/file.rs b/crates/core/src/runtime/nodes/storage_nodes/file.rs new file mode 100644 index 00000000..a0324279 --- /dev/null +++ b/crates/core/src/runtime/nodes/storage_nodes/file.rs @@ -0,0 +1,113 @@ +use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; +use tokio::net::UdpSocket; + +use base64::prelude::*; +use serde::Deserialize; + +use crate::runtime::flow::Flow; +use crate::runtime::nodes::*; +use edgelink_macro::*; +use validator::{Validate, ValidationError}; + + + + + +#[derive(Debug)] +#[flow_node("file")] +struct FileNode { + base: FlowNode, + config: FileNodeConfig, +} + +impl FileNode { + fn build(_flow: &Flow, state: FlowNode, config: &RedFlowNodeConfig) -> crate::Result> { + let udp_config = FileNodeConfig::deserialize(&config.rest)?; + + + + let node = FileNode { base: state, config: udp_config }; + Ok(Box::new(node)) + } +} + +#[derive(Deserialize,Validate, Debug)] +struct FileNodeConfig { + #[validate(length(min = 1))] + filename: String, + #[serde(rename = "filenameType")] + filename_type: Option, + #[serde( rename= "appendNewline")] + append_new_line: String, + #[serde( rename= "overwriteFile")] + overwrite_file: String, + create_dir: bool, + encoding: String, + +} + +impl FileNode { + async fn uow(&self, msg: MsgHandle, socket: &UdpSocket) -> crate::Result<()> { + let msg_guard = msg.read().await; + if let Some(payload) = msg_guard.get("payload") { + let remote_addr = std::net::SocketAddr::new( + self.config.addr.unwrap(), // TODO FIXME + self.config.port.unwrap(), + ); + + if let Some(bytes) = payload.as_bytes() { + if self.config.base64 { + let b64_str = BASE64_STANDARD.encode(bytes); + let bytes = b64_str.as_bytes(); + socket.send_to(bytes, remote_addr).await?; + } else { + socket.send_to(bytes, remote_addr).await?; + } + } + if let Some(bytes) = payload.to_bytes() { + socket.send_to(&bytes, remote_addr).await?; + } else { + log::warn!("Failed to convert payload into bytes"); + } + } + + Ok(()) + } +} + +#[async_trait] +impl FlowNodeBehavior for FileNode { + fn get_node(&self) -> &FlowNode { + &self.base + } + + async fn run(self: Arc, stop_token: CancellationToken) { + let local_addr: SocketAddr = match self.config.outport { + Some(port) => SocketAddr::new(self.config.iface.unwrap(), port), + _ => match self.config.ipv { + UdpIpV::V4 => "0.0.0.0:0".parse().unwrap(), + UdpIpV::V6 => "[::]:0".parse().unwrap(), + }, + }; + + match tokio::net::UdpSocket::bind(local_addr).await { + Ok(socket) => { + let socket = Arc::new(socket); + while !stop_token.is_cancelled() { + let cloned_socket = socket.clone(); + + let node = self.clone(); + with_uow(node.as_ref(), stop_token.clone(), |node, msg| async move { + node.uow(msg, &cloned_socket).await + }) + .await; + } + } + + Err(e) => { + log::error!("Can not bind local address: {:?}", e); + } + } + } +} diff --git a/crates/core/src/runtime/nodes/storage_nodes/file_in.rs b/crates/core/src/runtime/nodes/storage_nodes/file_in.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/core/src/runtime/nodes/storage_nodes/mod.rs b/crates/core/src/runtime/nodes/storage_nodes/mod.rs new file mode 100644 index 00000000..59e6f308 --- /dev/null +++ b/crates/core/src/runtime/nodes/storage_nodes/mod.rs @@ -0,0 +1,3 @@ +// mod file; +mod watch; +mod file_in; \ No newline at end of file diff --git a/crates/core/src/runtime/nodes/storage_nodes/watch.rs b/crates/core/src/runtime/nodes/storage_nodes/watch.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/core/src/utils/constants.rs b/crates/core/src/utils/constants.rs new file mode 100644 index 00000000..e1df1afa --- /dev/null +++ b/crates/core/src/utils/constants.rs @@ -0,0 +1,8 @@ +pub const SUB_FLOW_TYPE:&'static str="subflow"; +pub const SUB_FLOW_TYPE_HEAD:&'static str="subflow:"; +pub const TAB_STR:&'static str="tab"; +pub const FLOW_STR:&'static str="flow"; +pub const TYPE_STR:&'static str="type"; +pub const NAME_STR:&'static str="name"; +pub const ID_STR:&'static str="id"; +pub const ENV_STR:&'static str="env"; \ No newline at end of file diff --git a/crates/core/src/utils/handle.rs b/crates/core/src/utils/handle.rs new file mode 100644 index 00000000..4fb22d40 --- /dev/null +++ b/crates/core/src/utils/handle.rs @@ -0,0 +1,39 @@ + + + +/// [handle_option] 通用的msg处理方式实现error throw 的代码优雅程度。 +/// +/// # Example +/// +/// ```no_run +/// +/// struct LOC00007; +/// fn main()->Result<(),()>{ +/// use edgelink_core::handle_option; +/// handle_option!(result: Result::Ok(""), etype: LOC00007, "x".clone() ,"y".clone()); +/// } +/// +/// ``` +/// + +#[macro_export] +macro_rules! handle_option { + (result: $result:expr, $etype:expr , str:$arg:tt)=> { + match $result { + Some(re) => {re} + None => { + use crate::EdgelinkError; + return Err($etype($arg.to_string()).into()); + } + } + }; +} +#[macro_export] +macro_rules! get_json_value { + ( $result:expr, $key:tt, str)=> { + $result.get($key).and_then(|x| x.as_str()).unwrap_or("") + }; + ( $result:expr, $key:tt, string)=> { + $result.get($key).and_then(|x| x.to_string()).unwrap_or("".to_owned()) + }; +} \ No newline at end of file diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index 9730d9fe..4cf0b3d9 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -8,6 +8,8 @@ pub mod graph; pub mod time; pub mod topo; +pub(crate) mod constants; +mod handle; pub fn generate_uid() -> u64 { let mut rng = rand::thread_rng();