From c1649feae7c8943530c9f04cac70be6726849887 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Apr 2026 18:08:01 +0200 Subject: [PATCH 1/6] dependencies: updated tucana to 0.0.69 --- Cargo.lock | 10 +++++----- Cargo.toml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf6ea6d..465f4a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,7 +362,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1180,7 +1180,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1773,9 +1773,9 @@ dependencies = [ [[package]] name = "tucana" -version = "0.0.68" +version = "0.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abae78f798d1203bbcce361ba4cb4c500f8fe64e56d16ba3a5a0854e285377eb" +checksum = "2514ae6005ba6f36306aa718d06e80b520f073c8a5594eb1659cda7f56988dc9" dependencies = [ "pbjson", "pbjson-build", @@ -1896,7 +1896,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0bcb602..916b6eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ homepage = "https://code0.tech" license = "Apache-2.0" [dependencies] -tucana = { version = "0.0.68", features = ["aquila"] } +tucana = { version = "0.0.69", features = ["aquila"] } async-trait = "0.1.85" log = "0.4.24" tonic = "0.14.1" From c4b0ec77831bcb6bd421ae7d1783c9cf1d9f858c Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Apr 2026 18:17:33 +0200 Subject: [PATCH 2/6] feat: added new reader for new module structure --- src/flow_definition/feature/version.rs | 11 +- src/flow_definition/mod.rs | 476 +++++++++++++++---------- 2 files changed, 305 insertions(+), 182 deletions(-) diff --git a/src/flow_definition/feature/version.rs b/src/flow_definition/feature/version.rs index 51c0f76..31e5129 100644 --- a/src/flow_definition/feature/version.rs +++ b/src/flow_definition/feature/version.rs @@ -1,4 +1,6 @@ -use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; +use tucana::shared::{ + DefinitionDataType, FlowType, FunctionDefinition, RuntimeFlowType, RuntimeFunctionDefinition, +}; pub trait HasVersion { fn version(&self) -> &String; @@ -25,6 +27,13 @@ impl HasVersion for FunctionDefinition { &self.version } } + +impl HasVersion for RuntimeFlowType { + fn version(&self) -> &String { + &self.version + } +} + impl HasVersion for RuntimeFunctionDefinition { fn version(&self) -> &String { &self.version diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index ab473ac..f975f62 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -5,9 +5,13 @@ use crate::flow_definition::error::ReaderError; use crate::flow_definition::feature::Feature; use crate::flow_definition::feature::version::HasVersion; use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use std::fs; -use std::path::Path; -use tucana::shared::{DefinitionDataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition}; +use std::path::{Path, PathBuf}; +use tucana::shared::{ + DefinitionDataType, FlowType, FunctionDefinition, Module, ModuleConfigurationDefinition, + RuntimeFlowType, RuntimeFunctionDefinition, Translation, +}; use walkdir::WalkDir; pub struct Reader { @@ -17,6 +21,54 @@ pub struct Reader { path: String, } +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub struct ModuleConfiguration { + pub identifier: String, + pub name: Vec, + pub description: Vec, + pub documentation: String, + pub author: String, + pub icon: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + pub version: String, +} + +#[derive(Clone, Debug, Default)] +struct LoadedModule { + name: String, + config: ModuleConfiguration, + data_types: Vec, + flow_types: Vec, + runtime_flow_types: Vec, + functions: Vec, + runtime_functions: Vec, + configurations: Vec, +} + +impl LoadedModule { + fn into_module(mut self) -> Module { + if self.config.identifier.is_empty() { + self.config.identifier = self.name.clone(); + } + + Module { + identifier: self.config.identifier, + name: self.config.name, + description: self.config.description, + documentation: self.config.documentation, + author: self.config.author, + icon: self.config.icon, + version: self.config.version, + flow_types: self.flow_types, + runtime_flow_types: self.runtime_flow_types, + function_definitions: self.functions, + runtime_function_definitions: self.runtime_functions, + definition_data_types: self.data_types, + configurations: self.configurations, + } + } +} + impl Reader { pub fn configure( path: String, @@ -33,210 +85,272 @@ impl Reader { } pub fn read_features(&self) -> Result, ReaderError> { - let definitions = Path::new(&self.path); - - match self.read_feature_content(definitions) { - Ok(features) => { - log::info!( - "Loaded {:?} feature/s", - &features - .iter() - .map(|f| f.name.clone()) - .collect::>() - ); - - log::debug!( - "Found FlowTypes {:?}", - &features - .iter() - .flat_map(|f| f.flow_types.iter().map(|t| t.identifier.clone())) - .collect::>() - ); - - log::debug!( - "Found DataTypes {:?}", - &features - .iter() - .flat_map(|f| f.data_types.iter().map(|t| t.identifier.clone())) - .collect::>() - ); - - log::debug!( - "Found RuntimeFunctions {:?}", - &features - .iter() - .flat_map(|f| f.runtime_functions.iter().map(|t| t.runtime_name.clone())) - .collect::>() - ); - - Ok(features) - } - Err(err) => { - log::error!("Failed to read feature/s from {}, {:?}", &self.path, err); - Err(ReaderError::ReadFeatureError { - path: self.path.to_string(), - source: Box::new(err), - }) - } - } + let modules = self + .read_loaded_modules() + .map_err(|err| ReaderError::ReadFeatureError { + path: self.path.clone(), + source: Box::new(err), + })?; + + Ok(modules + .into_iter() + .map(|module| Feature { + name: module.name, + data_types: module.data_types, + flow_types: module.flow_types, + runtime_functions: module.runtime_functions, + functions: module.functions, + }) + .collect()) } - fn read_feature_content(&self, dir: &Path) -> Result, ReaderError> { - let mut features: Vec = Vec::new(); + pub fn read_modules(&self) -> Result, ReaderError> { + let modules = self + .read_loaded_modules() + .map_err(|err| ReaderError::ReadFeatureError { + path: self.path.clone(), + source: Box::new(err), + })?; - let readdir = fs::read_dir(dir).map_err(|err| { - log::error!("Failed to read directory {}: {:?}", dir.display(), err); - ReaderError::ReadDirectoryError { - path: dir.to_path_buf(), - error: err, - } - })?; - - for entry_result in readdir { - let entry = match entry_result { - Ok(entry) => entry, - Err(err) => { - log::error!("Failed to read directory entry: {:?}", err); - return Err(ReaderError::DirectoryEntryError(err)); - } - }; + Ok(modules.into_iter().map(LoadedModule::into_module).collect()) + } - let path = entry.path(); + fn read_loaded_modules(&self) -> Result, ReaderError> { + let root = Path::new(&self.path); + if !root.exists() || !root.is_dir() { + return Err(ReaderError::ReadDirectoryError { + path: root.to_path_buf(), + error: std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Definition path {} does not exist", root.display()), + ), + }); + } - if !path.is_dir() { + let mut modules = Vec::new(); + for module_dir in find_module_directories(root) { + let module_name = module_name_from_paths(root, &module_dir); + if !self.feature_allowed(&module_name, &module_dir) { continue; } - let feature_name = path - .file_name() - .unwrap_or_default() - .to_string_lossy() - .to_string(); + let mut module = LoadedModule { + name: module_name, + ..Default::default() + }; - if !self.accepted_features.is_empty() && !self.accepted_features.contains(&feature_name) - { - log::info!("Skipping not accepted feature: {}", feature_name); - continue; + let module_file = module_dir.join("module.json"); + if module_file.is_file() { + match read_json_file::(&module_file) { + Ok(config) => module.config = config, + Err(err) if self.should_break => return Err(err), + Err(err) => log::warn!( + "Skipping invalid module definition {}: {:?}", + module_file.display(), + err + ), + } } - let data_types = match self - .load_definitions_for_feature::(&path, "data_types")? - { - Some(v) => v, - None => continue, - }; - - let flow_types = - match self.load_definitions_for_feature::(&path, "flow_types")? { - Some(v) => v, - None => continue, - }; - - let runtime_functions = match self - .load_definitions_for_feature::( - &path, - "runtime_functions", - )? { - Some(v) => v, - None => continue, - }; + let entries = + fs::read_dir(&module_dir).map_err(|error| ReaderError::ReadDirectoryError { + path: module_dir.clone(), + error, + })?; + + for entry in entries { + let entry = entry.map_err(ReaderError::DirectoryEntryError)?; + let file_type = entry + .file_type() + .map_err(ReaderError::DirectoryEntryError)?; + if !file_type.is_dir() { + continue; + } - let functions = match self - .load_definitions_for_feature::(&path, "functions")? - { - Some(v) => v, - None => continue, - }; + let path = entry.path(); + let dir_name = entry.file_name().to_string_lossy().to_string(); - let feature = Feature { - name: feature_name, - data_types, - flow_types, - runtime_functions, - functions, - }; + match dir_name.as_str() { + "flow_type" | "flow_types" => { + module + .flow_types + .extend(load_json_dir::(&path, self.should_break)?); + } + "runtime_flow_type" | "runtime_flow_types" => { + module + .runtime_flow_types + .extend(load_json_dir::(&path, self.should_break)?); + } + "data_type" | "data_types" => { + module + .data_types + .extend(load_json_dir::( + &path, + self.should_break, + )?); + } + "runtime_definition" | "runtime_definitions" | "runtime_functions" => { + module.runtime_functions.extend( + load_json_dir::(&path, self.should_break)?, + ); + } + "function" | "functions" => { + module.functions.extend(load_json_dir::( + &path, + self.should_break, + )?); + } + "configuration" | "configurations" => { + module.configurations.extend( + load_json_dir::( + &path, + self.should_break, + )?, + ); + } + _ => {} + } + } - features.push(feature); + module + .data_types + .retain(|item| item.is_accepted(&self.accepted_version)); + module + .flow_types + .retain(|item| item.is_accepted(&self.accepted_version)); + module + .runtime_flow_types + .retain(|item| item.is_accepted(&self.accepted_version)); + module + .functions + .retain(|item| item.is_accepted(&self.accepted_version)); + module + .runtime_functions + .retain(|item| item.is_accepted(&self.accepted_version)); + + modules.push(module); } - Ok(features) + Ok(modules) } - fn load_definitions_for_feature( - &self, - feature_dir: &Path, - sub_dir: &str, - ) -> Result>, ReaderError> - where - T: DeserializeOwned + HasVersion, - { - let dir = feature_dir.join(sub_dir); - - let raw: Vec = match self.collect_definitions::(&dir) { - Ok(v) => v, - Err(err) => { - if self.should_break { - return Err(ReaderError::ReadFeatureError { - path: dir.to_string_lossy().to_string(), - source: Box::new(err), - }); - } else { - // Skip this feature if we shouldn't break on error - return Ok(None); - } - } - }; + fn feature_allowed(&self, module_name: &str, module_path: &Path) -> bool { + if self.accepted_features.is_empty() { + return true; + } - let items = raw - .into_iter() - .filter(|v| v.is_accepted(&self.accepted_version)) - .collect(); + let short_name = module_path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(); - Ok(Some(items)) + self.accepted_features + .iter() + .any(|feature| feature == module_name || feature == short_name) } +} - fn collect_definitions(&self, dir: &Path) -> Result, ReaderError> - where - T: DeserializeOwned, - { - let mut definitions = Vec::new(); +fn read_json_file(path: &Path) -> Result { + let content = fs::read_to_string(path).map_err(|error| ReaderError::ReadFileError { + path: path.to_path_buf(), + error, + })?; - if !dir.exists() { - return Ok(definitions); - } + serde_json::from_str::(&content).map_err(|error| ReaderError::JsonError { + path: path.to_path_buf(), + error, + }) +} - for entry in WalkDir::new(dir).into_iter().filter_map(Result::ok) { - let path = entry.path(); - - if path.is_file() && path.extension().is_some_and(|ext| ext == "json") { - let content = match fs::read_to_string(path) { - Ok(content) => content, - Err(err) => { - log::error!("Failed to read file {}: {}", path.display(), err); - return Err(ReaderError::ReadFileError { - path: path.to_path_buf(), - error: err, - }); - } - }; - - match serde_json::from_str::(&content) { - Ok(def) => definitions.push(def), - Err(e) => { - if self.should_break { - log::error!("Failed to parse JSON in file {}: {:?}", path.display(), e); - return Err(ReaderError::JsonError { - path: path.to_path_buf(), - error: e, - }); - } else { - log::warn!("Skipping invalid JSON file {}: {:?}", path.display(), e); - } - } - } - } +fn load_json_dir( + dir: &Path, + should_break: bool, +) -> Result, ReaderError> { + let mut items = Vec::new(); + for file in WalkDir::new(dir) + .into_iter() + .filter_map(Result::ok) + .map(|entry| entry.into_path()) + .filter(|path| { + path.is_file() + && path + .extension() + .and_then(|ext| ext.to_str()) + .is_some_and(|ext| ext.eq_ignore_ascii_case("json")) + }) + { + match read_json_file::(&file.as_path()) { + Ok(item) => items.push(item), + Err(err) if should_break => return Err(err), + Err(err) => log::warn!("Skipping invalid definition {}: {:?}", file.display(), err), } + } + + Ok(items) +} + +fn find_module_directories(root: &Path) -> Vec { + let mut modules = WalkDir::new(root) + .into_iter() + .filter_map(Result::ok) + .filter(|entry| entry.file_type().is_dir()) + .map(|entry| entry.into_path()) + .filter(|path| looks_like_module(path)) + .collect::>(); + modules.sort(); + modules +} + +fn looks_like_module(path: &Path) -> bool { + let entries = match fs::read_dir(path) { + Ok(entries) => entries, + Err(_) => return false, + }; + + entries.flatten().any(|entry| { + let file_type = match entry.file_type() { + Ok(file_type) => file_type, + Err(_) => return false, + }; + + let name = entry.file_name().to_string_lossy().to_string(); + name == "module.json" || (file_type.is_dir() && is_definition_dir(&name)) + }) +} + +fn is_definition_dir(name: &str) -> bool { + matches!( + name, + "flow_type" + | "flow_types" + | "runtime_flow_type" + | "runtime_flow_types" + | "data_type" + | "data_types" + | "runtime_definition" + | "runtime_definitions" + | "runtime_functions" + | "function" + | "functions" + | "configuration" + | "configurations" + ) +} - Ok(definitions) +fn module_name_from_paths(root: &Path, module_path: &Path) -> String { + let relative = module_path + .strip_prefix(root) + .ok() + .and_then(|path| path.to_str()) + .unwrap_or_default(); + + if relative.is_empty() || relative == "." { + module_path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("module") + .to_string() + } else { + relative.to_string() } } From a78d091e1d4b647d3714bc4c26ce5baeb7dd8728 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Apr 2026 18:17:52 +0200 Subject: [PATCH 3/6] feat: replaced old definition services with new module service --- src/flow_service/mod.rs | 230 +++++++--------------------------------- 1 file changed, 40 insertions(+), 190 deletions(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 8bd43f7..cf262d5 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -4,26 +4,15 @@ use crate::{ }; use tonic::{Extensions, Request, transport::Channel}; use tucana::{ - aquila::{ - DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest, - RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient, - flow_type_service_client::FlowTypeServiceClient, - function_definition_service_client::FunctionDefinitionServiceClient, - runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient, - }, - shared::{ - DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition, - }, + aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient}, + shared::Module, }; pub mod auth; pub mod retry; pub struct FlowUpdateService { - data_types: Vec, - runtime_functions: Vec, - functions: Vec, - flow_types: Vec, + modules: Vec, channel: Channel, aquila_token: String, definition_source: Option, @@ -34,35 +23,19 @@ impl FlowUpdateService { /// /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types. pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self { - let mut data_types = Vec::new(); - let mut runtime_functions = Vec::new(); - let mut functions = Vec::new(); - let mut flow_types = Vec::new(); - let reader = Reader::configure(definition_path.to_string(), true, vec![], None); - - let features = match reader.read_features() { - Ok(features) => features, + let modules = match reader.read_modules() { + Ok(modules) => modules, Err(error) => { log::error!("Error occurred while reading definitions: {:?}", error); panic!("Error occurred while reading definitions") } }; - for feature in features { - data_types.append(&mut feature.data_types.clone()); - flow_types.append(&mut feature.flow_types.clone()); - runtime_functions.append(&mut feature.runtime_functions.clone()); - functions.append(&mut feature.functions.clone()); - } - let channel = create_channel_with_retry("Aquila", aquila_url).await; Self { - data_types, - runtime_functions, - functions, - flow_types, + modules, channel, aquila_token, definition_source: None, @@ -74,194 +47,71 @@ impl FlowUpdateService { self } - pub fn with_flow_types(mut self, flow_types: Vec) -> Self { - self.flow_types = flow_types; - self - } - - pub fn with_data_types(mut self, data_types: Vec) -> Self { - self.data_types = data_types; - self - } - - pub fn with_runtime_functions( - mut self, - runtime_functions: Vec, - ) -> Self { - self.runtime_functions = runtime_functions; - self - } - - pub fn with_functions(mut self, functions: Vec) -> Self { - self.functions = functions; - self - } - pub async fn send(&mut self) { let _ = self.send_with_status().await; } pub async fn send_with_status(&mut self) -> bool { - let data_types_success = self.update_data_types().await; - let runtime_functions_success = self.update_runtime_functions().await; - let functions_success = self.update_functions().await; - let flow_types_success = self.update_flow_types().await; - data_types_success && runtime_functions_success && functions_success && flow_types_success + self.update().await } - async fn update_data_types(&mut self) -> bool { - if self.data_types.is_empty() { - log::info!("No DataTypes present."); + async fn update(&mut self) -> bool { + if self.modules.is_empty() { + log::info!("No Modules are present, aboarting update."); return true; } + let mut modules = self.modules.clone(); if let Some(source) = &self.definition_source { - for data_type in self.data_types.iter_mut() { - data_type.definition_source = source.to_string(); - } + modules = modules + .into_iter() + .map(|module| apply_definition_source_to_module(module, source.clone())) + .collect::>(); } - log::info!("Updating {} DataTypes.", self.data_types.len()); - let mut client = DataTypeServiceClient::new(self.channel.clone()); + log::info!("Updating {} Modules.", self.modules.len()); + let mut client = ModuleServiceClient::new(self.channel.clone()); let request = Request::from_parts( get_authorization_metadata(&self.aquila_token), Extensions::new(), - DataTypeUpdateRequest { - data_types: self.data_types.clone(), - }, + ModuleUpdateRequest { modules }, ); match client.update(request).await { Ok(response) => { let res = response.into_inner(); - log::info!( - "Was the update of the DataTypes accepted by Sagittarius? {}", - res.success - ); - - res.success - } - Err(err) => { - log::error!("Failed to update data types: {:?}", err); - false - } - } - } - - async fn update_functions(&mut self) -> bool { - if self.functions.is_empty() { - log::info!("No FunctionDefinitions present."); - return true; - } - if let Some(source) = &self.definition_source { - for function in self.functions.iter_mut() { - function.definition_source = source.to_string(); - } - }; - - log::info!("Updating {} FunctionDefinitions.", self.functions.len()); - let mut client = FunctionDefinitionServiceClient::new(self.channel.clone()); - let request = Request::from_parts( - get_authorization_metadata(&self.aquila_token), - Extensions::new(), - FunctionDefinitionUpdateRequest { - functions: self.functions.clone(), - }, - ); + match res.success { + true => log::info!("Module definition update has been successful"), + false => log::warn!("Module definition update has been unsuccessful"), + }; - match client.update(request).await { - Ok(response) => { - let res = response.into_inner(); - log::info!( - "Was the update of the FunctionDefinitions accepted by Sagittarius? {}", - res.success - ); res.success } Err(err) => { - log::error!("Failed to update function definitions: {:?}", err); + log::error!("Module definition update failed. Reason: {:?}", err); false } } } +} - async fn update_runtime_functions(&mut self) -> bool { - if self.runtime_functions.is_empty() { - log::info!("No RuntimeFunctionDefinitions present."); - return true; - } - - if let Some(source) = &self.definition_source { - for runtime_function in self.runtime_functions.iter_mut() { - runtime_function.definition_source = source.to_string(); - } - } - - log::info!( - "Updating {} RuntimeFunctionDefinitions.", - self.runtime_functions.len() - ); - let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone()); - let request = Request::from_parts( - get_authorization_metadata(&self.aquila_token), - Extensions::new(), - RuntimeFunctionDefinitionUpdateRequest { - runtime_functions: self.runtime_functions.clone(), - }, - ); - - match client.update(request).await { - Ok(response) => { - let res = response.into_inner(); - log::info!( - "Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}", - res.success - ); - res.success - } - Err(err) => { - log::error!("Failed to update runtime function definitions: {:?}", err); - false - } - } +fn apply_definition_source_to_module(mut module: Module, source: String) -> Module { + for data_type in &mut module.definition_data_types { + data_type.definition_source = source.clone(); } - - async fn update_flow_types(&mut self) -> bool { - if self.flow_types.is_empty() { - log::info!("No FlowTypes present."); - return true; - } - - if let Some(source) = &self.definition_source { - for flow_type in self.flow_types.iter_mut() { - flow_type.definition_source = Some(source.to_string()); - } - } - - log::info!("Updating {} FlowTypes.", self.flow_types.len()); - let mut client = FlowTypeServiceClient::new(self.channel.clone()); - let request = Request::from_parts( - get_authorization_metadata(&self.aquila_token), - Extensions::new(), - FlowTypeUpdateRequest { - flow_types: self.flow_types.clone(), - }, - ); - - match client.update(request).await { - Ok(response) => { - let res = response.into_inner(); - log::info!( - "Was the update of the FlowTypes accepted by Sagittarius? {}", - res.success - ); - res.success - } - Err(err) => { - log::error!("Failed to update flow types: {:?}", err); - false - } - } + for flow_type in &mut module.flow_types { + flow_type.definition_source = Some(source.clone()); + } + for runtime_flow_type in &mut module.runtime_flow_types { + runtime_flow_type.definition_source = Some(source.clone()); + } + for function in &mut module.function_definitions { + function.definition_source = source.clone(); } + for runtime_function in &mut module.runtime_function_definitions { + runtime_function.definition_source = source.clone(); + } + + module } From e447efd770cb19cc41c87fd6aa0e97503df39ca4 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 27 Apr 2026 18:19:26 +0200 Subject: [PATCH 4/6] ref: cargo clippy --- src/flow_definition/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index f975f62..b392f84 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -279,7 +279,7 @@ fn load_json_dir( .is_some_and(|ext| ext.eq_ignore_ascii_case("json")) }) { - match read_json_file::(&file.as_path()) { + match read_json_file::(file.as_path()) { Ok(item) => items.push(item), Err(err) if should_break => return Err(err), Err(err) => log::warn!("Skipping invalid definition {}: {:?}", file.display(), err), From 364640837634400330090cb33d69bcce3cf74841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:38:35 +0200 Subject: [PATCH 5/6] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index cf262d5..8967253 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -21,7 +21,8 @@ pub struct FlowUpdateService { impl FlowUpdateService { /// Create a new FlowUpdateService instance from an Aquila URL and a definition path. /// - /// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types. + /// This reads the definition files from the given path as modules and initializes the + /// service with those module definitions. pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self { let reader = Reader::configure(definition_path.to_string(), true, vec![], None); let modules = match reader.read_modules() { From 8dbe267d32b8736f87a8b3e251c13b90a816ddc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:38:47 +0200 Subject: [PATCH 6/6] Update src/flow_service/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- src/flow_service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow_service/mod.rs b/src/flow_service/mod.rs index 8967253..bc20af1 100644 --- a/src/flow_service/mod.rs +++ b/src/flow_service/mod.rs @@ -58,7 +58,7 @@ impl FlowUpdateService { async fn update(&mut self) -> bool { if self.modules.is_empty() { - log::info!("No Modules are present, aboarting update."); + log::info!("No Modules are present, aborting update."); return true; }