From ba5e2db7615597392bff6d927c9f22335e768f12 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 29 Apr 2026 13:19:49 +0200 Subject: [PATCH] DPL: allow for configurable CCDB paths --- .../CCDBSupport/src/AnalysisCCDBHelpers.cxx | 34 +++++++++++++------ .../Core/include/Framework/AnalysisHelpers.h | 14 ++++---- .../Core/include/Framework/Configurable.h | 29 +++++++++++++--- Framework/Core/src/AnalysisHelpers.cxx | 1 + Framework/Core/src/ArrowSupport.cxx | 29 ++++++++++++++++ .../TestWorkflows/src/o2TestAnalysisCCDB.cxx | 18 +++++++++- 6 files changed, 102 insertions(+), 23 deletions(-) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index c65ef2903db59..21fdae4a57760 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -21,7 +21,7 @@ #include "Framework/Signpost.h" #include "Framework/DanglingEdgesContext.h" #include "Framework/ConfigContext.h" -#include "Framework/ConfigContext.h" +#include "Framework/ConfigParamsHelper.h" #include #include #include @@ -71,31 +71,45 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) { return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) { auto& dec = ic.services().get(); + // The effective default for each ccdb: option was already resolved at topology + // time by ArrowSupport (consulting task Configurables) and registered on this + // device's options. Here we just read the final value — honouring any further + // runtime override supplied via CLI or JSON config. + std::unordered_map ccdbUrls; + for (auto& input : dec.analysisCCDBInputs) { + for (auto& m : input.metadata) { + if (!m.name.starts_with("ccdb:") || ccdbUrls.count(m.name)) { + continue; + } + std::string url = m.defaultValue.asString(); + if (ConfigParamsHelper::hasOption(spec.options, m.name)) { + url = options.get(m.name.c_str()); + } + LOGP(info, "CCDB path resolved for {}: {}", m.name, url); + ccdbUrls.emplace(m.name, std::move(url)); + } + } std::vector> schemas; - auto schemaMetadata = std::make_shared(); - for (auto& input : dec.analysisCCDBInputs) { + auto schemaMetadata = std::make_shared(); std::vector> fields; schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input)); schemaMetadata->Append("outputBinding", input.binding); - for (auto& m : input.metadata) { - // Save the list of input tables if (m.name.starts_with("input:")) { auto name = m.name.substr(6); schemaMetadata->Append("sourceTable", name); schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get(DataSpecUtils::fromMetadataString(m.defaultValue.get()).matcher))); continue; } - // Ignore the non ccdb: entries if (!m.name.starts_with("ccdb:")) { continue; } - // Create the schema of the output - auto metadata = std::make_shared(); - metadata->Append("url", m.defaultValue.asString()); + auto fieldMetadata = std::make_shared(); + auto it = ccdbUrls.find(m.name); + fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString()); auto columnName = m.name.substr(strlen("ccdb:")); - fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, metadata)); + fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, fieldMetadata)); } schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index bfc5a02891dad..cfd2f357ba06f 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -172,7 +172,7 @@ struct Builder { std::shared_ptr materialize(ProcessingContext& pc); }; -} // namespace o2::framework +} // namespace o2::framework namespace o2::soa { @@ -394,7 +394,7 @@ constexpr auto getIndexMetadata() -> std::vector return {}; } -} // namespace +} // namespace template constexpr auto tableRef2InputSpec() @@ -463,7 +463,7 @@ constexpr auto tableRef2OutputRef() o2::aod::label(), R.version}; } -} // namespace o2::soa +} // namespace o2::soa namespace o2::framework { @@ -672,7 +672,7 @@ struct Spawns : decltype(transformBase()) { std::shared_ptr table = nullptr; std::shared_ptr extension = nullptr; - std::array projectors = [](framework::pack) -> std::array + std::array projectors = [](framework::pack)->std::array { return {{std::move(C::Projector())...}}; } @@ -1077,7 +1077,7 @@ concept is_partition = requires(T t) { requires std::same_as; requires std::same_as>>; }; -} // namespace o2::framework +} // namespace o2::framework namespace o2::soa { @@ -1100,6 +1100,6 @@ auto Attach(T const& table) using output_t = Join, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>; return output_t{{table.asArrowTable()}, table.offset()}; } -} // namespace o2::soa +} // namespace o2::soa -#endif // o2_framework_AnalysisHelpers_H_DEFINED +#endif // o2_framework_AnalysisHelpers_H_DEFINED diff --git a/Framework/Core/include/Framework/Configurable.h b/Framework/Core/include/Framework/Configurable.h index 0931884da1ff7..3cbd1839b7d89 100644 --- a/Framework/Core/include/Framework/Configurable.h +++ b/Framework/Core/include/Framework/Configurable.h @@ -83,6 +83,26 @@ struct Configurable : IP { template using MutableConfigurable = Configurable>; +/// Convenience wrapper for overriding the CCDB path of a CCDB column declared +/// with DECLARE_SOA_CCDB_COLUMN / DECLARE_SOA_CCDB_COLUMN_FULL. +/// +/// The option name, default value, and help string are all derived automatically +/// from the column type: name = "ccdb:" + Column::mLabel, default = Column::query. +/// +/// Example: +/// struct MyTask { +/// ConfigurableCCDBPath lhcPhasePath; +/// }; +template +struct ConfigurableCCDBPath : Configurable { + ConfigurableCCDBPath() + : Configurable{std::string{"ccdb:"} + Column::mLabel, + std::string{Column::query}, + std::string{"CCDB path for "} + Column::mLabel + " (default: " + Column::query + ")"} + { + } +}; + template concept is_configurable = requires(T t) { requires std::same_as; @@ -93,11 +113,10 @@ concept is_configurable = requires(T t) { using ConfigurableAxis = Configurable, ConfigParamKind::kAxisSpec, ConfigurablePolicyConst, ConfigParamKind::kAxisSpec>>; template -concept is_configurable_axis = is_configurable&& - requires() -{ - T::kind == ConfigParamKind::kAxisSpec; -}; +concept is_configurable_axis = is_configurable && + requires() { + T::kind == ConfigParamKind::kAxisSpec; + }; template struct ProcessConfigurable : Configurable { diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index b7eac692d3859..149664c42caba 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -201,4 +201,5 @@ std::shared_ptr Builder::materialize(ProcessingContext& pc) result = o2::soa::IndexBuilder::materialize(*builders.get(), std::move(tables), records, outputSchema, exclusive); return result; } + } // namespace o2::framework diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 780c836437c2b..eecff4ce87c74 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -34,6 +34,7 @@ #include "Framework/ServiceRegistryHelpers.h" #include "Framework/Signpost.h" #include "Framework/DefaultsHelpers.h" +#include "Framework/ConfigParamsHelper.h" #include "CommonMessageBackendsHelpers.h" #include @@ -637,6 +638,34 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() analysisCCDB->outputs.clear(); analysisCCDB->inputs.clear(); AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB); + // Register each ccdb: column path as an actual device option on the CCDB + // device so it can be read from ConfigParamRegistry at runtime. + // If any analysis task declared a Configurable with the same + // "ccdb:fXxx" name, prefer its default over the compile-time ::query value. + // First encountered wins; log a warning if two tasks declare conflicting defaults. + for (auto& input : dec.analysisCCDBInputs) { + for (auto& m : input.metadata | std::views::filter(checks::has_params_with_name_starting("ccdb:"))) { + ConfigParamSpec effective = m; // start with compile-time default + bool foundFirst = false; + for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) { + for (auto& opt : d.options) { + if (opt.name == m.name) { + if (!foundFirst) { + effective = opt; // first task Configurable wins + foundFirst = true; + } else if (opt.defaultValue.asString() != effective.defaultValue.asString()) { + LOGP(warn, "Task '{}' declares Configurable '{}' = '{}' which conflicts " + "with an earlier value '{}'; earlier value will be used.", + d.name, opt.name, opt.defaultValue.asString(), + effective.defaultValue.asString()); + } + break; + } + } + } + ConfigParamsHelper::addOptionIfMissing(analysisCCDB->options, effective); + } + } // load real AlgorithmSpec before deployment analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); } diff --git a/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx index f9684762539f7..3cf20d9ff5296 100644 --- a/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx +++ b/Framework/TestWorkflows/src/o2TestAnalysisCCDB.cxx @@ -51,8 +51,11 @@ struct DummyTimestampsTable { }; struct SimpleCCDBConsumer { + ConfigurableCCDBPath lhcPhasePath; + void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps) { + LOGP(info, "LHCphase CCDB path configurable value: {}", lhcPhasePath.value); LOGP(info, "Looking at all the LHCphases associated to the timestamps"); for (auto& object : ccdbObjectsForAllTimestamps) { std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl; @@ -60,10 +63,23 @@ struct SimpleCCDBConsumer { } }; +struct AnotherCCDBConsumer { + ConfigurableCCDBPath lhcPhasePath; + + void process(o2::aod::TOFCalibrationObjects const& ccdbObjectsForAllTimestamps) + { + LOGP(info, "AnotherCCDBConsumer LHCphase CCDB path configurable value: {}", lhcPhasePath.value); + for (auto& object : ccdbObjectsForAllTimestamps) { + std::cout << object.lhcPhase().getStartValidity() << " " << object.lhcPhase().getEndValidity() << std::endl; + } + } +}; + WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) { return WorkflowSpec{ adaptAnalysisTask(cfgc), - adaptAnalysisTask(cfgc, TaskName{"simple-ccdb-cunsumer"}), + adaptAnalysisTask(cfgc, TaskName{"simple-ccdb-consumer"}), + adaptAnalysisTask(cfgc, TaskName{"another-ccdb-consumer"}), }; }