diff --git a/Framework/Core/include/Framework/ComputingQuotaEvaluator.h b/Framework/Core/include/Framework/ComputingQuotaEvaluator.h index 17ce9c2ba3e65..b25bc1611d79f 100644 --- a/Framework/Core/include/Framework/ComputingQuotaEvaluator.h +++ b/Framework/Core/include/Framework/ComputingQuotaEvaluator.h @@ -37,7 +37,9 @@ class ComputingQuotaEvaluator /// @a task the task which needs some quota /// @a request the resource request the @a task needs /// @a now the time (e.g. uv_now) when invoked. - bool selectOffer(int task, ComputingQuotaRequest const& request, uint64_t now); + /// @a accumulated if non-null, filled with the resources accumulated from + /// selected offers (useful for diagnosing shortfalls on failure). + bool selectOffer(int task, ComputingQuotaRequest const& request, uint64_t now, ComputingQuotaOffer* accumulated = nullptr); /// Consume offers for a given taskId /// @a reportConsumedOffer callback which reports back that an offer has been consumed. void consume(int taskId, diff --git a/Framework/Core/include/Framework/ResourcePolicy.h b/Framework/Core/include/Framework/ResourcePolicy.h index eb8d77b209a8f..1062c223b07f6 100644 --- a/Framework/Core/include/Framework/ResourcePolicy.h +++ b/Framework/Core/include/Framework/ResourcePolicy.h @@ -31,6 +31,9 @@ struct ResourcePolicy { std::string name; Matcher matcher; ComputingQuotaRequest request; + /// Minimum resources required to run. Used to report which resources + /// are missing when scheduling fails. + ComputingQuotaOffer minRequired; }; } // namespace o2::framework diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index 3f5bff2b53fab..5dd4249cab519 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -62,7 +62,7 @@ struct QuotaEvaluatorStats { std::vector expired; }; -bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector, uint64_t now) +bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector, uint64_t now, ComputingQuotaOffer* outAccumulated) { O2_SIGNPOST_ID_GENERATE(qid, quota); @@ -102,10 +102,13 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& } dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCES_SATISFACTORY), DataProcessingStats::Op::Add, 1}); } else { - O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources to select offers."); - dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1}); if (result.size()) { + O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources: accumulated %zu partial offers providing cpu=%d, memory=%lld MB, shared memory=%lld MB, timeslices=%lld, but still insufficient.", + result.size(), totalOffer.cpu, totalOffer.memory / 1000000, totalOffer.sharedMemory / 1000000, totalOffer.timeslices); dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCES_INSUFFICIENT), DataProcessingStats::Op::Add, 1}); + } else { + O2_SIGNPOST_START(quota, sid, "summary", "Not enough resources: no suitable offers found (all offers were invalid, expired, or owned by other tasks)."); + dpStats.updateStats({static_cast(ProcessingStatsId::RESOURCES_MISSING), DataProcessingStats::Op::Add, 1}); } } if (stats.invalidOffers.size()) { @@ -205,7 +208,11 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& O2_SIGNPOST_EVENT_EMIT(quota, tid, "select", "Offer should be expired by now, checking again."); }, minValidity + 100, 0); } // If we get here it means we never got enough offers, so we return false. - return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats); + bool result = summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats); + if (outAccumulated) { + *outAccumulated = accumulated; + } + return result; } void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer, std::function& reportConsumedOffer) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index b062f2bf68a75..906dd0c75dbf4 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1371,7 +1371,8 @@ void DataProcessingDevice::Run() // the evaluator. In this case, the request is always satisfied and // we run on whatever resource is available. auto& spec = ref.get(); - bool enough = ref.get().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop)); + ComputingQuotaOffer accumulated; + bool enough = ref.get().selectOffer(streamRef.index, spec.resourcePolicy.request, uv_now(state.loop), &accumulated); struct SchedulingStats { std::atomic lastScheduled = 0; @@ -1399,17 +1400,42 @@ void DataProcessingDevice::Run() run_completion(&handle, 0); } } else { + auto const lastSched = schedulingStats.lastScheduled.load(); + auto const schedInfo = lastSched ? fmt::format(", last scheduled {} ms ago", uv_now(state.loop) - lastSched) : std::string(", never successfully scheduled"); + auto const buildMissingInfo = [&]() { + auto const& required = spec.resourcePolicy.minRequired; + std::string missingInfo; + if (required.sharedMemory > 0 && accumulated.sharedMemory < required.sharedMemory) { + missingInfo += fmt::format(" shared memory (have {} MB, need {} MB)", accumulated.sharedMemory / 1000000, required.sharedMemory / 1000000); + } + if (required.timeslices > 0 && accumulated.timeslices < required.timeslices) { + missingInfo += fmt::format(" timeslices (have {}, need {})", accumulated.timeslices, required.timeslices); + } + if (required.cpu > 0 && accumulated.cpu < required.cpu) { + missingInfo += fmt::format(" CPU cores (have {}, need {})", accumulated.cpu, required.cpu); + } + if (required.memory > 0 && accumulated.memory < required.memory) { + missingInfo += fmt::format(" memory (have {} MB, need {} MB)", accumulated.memory / 1000000, required.memory / 1000000); + } + return missingInfo.empty() ? std::string(" (policy: ") + spec.resourcePolicy.name + ")" : " -" + missingInfo; + }; if (schedulingStats.numberOfUnscheduledSinceLastScheduled >= schedulingStats.nextWarnAt) { + auto const missingStr = buildMissingInfo(); O2_SIGNPOST_EVENT_EMIT_WARN(scheduling, sid, "Run", - "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu. Data is not lost and it will be scheduled again.", + "Not enough resources to schedule computation on stream %d. %zu consecutive skips%s. Missing:%s. Data is not lost and it will be scheduled again.", + streamRef.index, schedulingStats.numberOfUnscheduledSinceLastScheduled.load(), - schedulingStats.lastScheduled.load()); + schedInfo.c_str(), + missingStr.c_str()); schedulingStats.nextWarnAt = schedulingStats.nextWarnAt * 2; } else { + auto const missingStr = buildMissingInfo(); O2_SIGNPOST_EVENT_EMIT(scheduling, sid, "Run", - "Not enough resources to schedule computation. %zu skipped so far. Last scheduled at %zu. Data is not lost and it will be scheduled again.", + "Not enough resources to schedule computation on stream %d. %zu consecutive skips%s. Missing:%s. Data is not lost and it will be scheduled again.", + streamRef.index, schedulingStats.numberOfUnscheduledSinceLastScheduled.load(), - schedulingStats.lastScheduled.load()); + schedInfo.c_str(), + missingStr.c_str()); } schedulingStats.numberOfUnscheduled++; schedulingStats.numberOfUnscheduledSinceLastScheduled++; diff --git a/Framework/Core/src/ResourcePolicyHelpers.cxx b/Framework/Core/src/ResourcePolicyHelpers.cxx index 2c5c4f54dd9b5..650beec3ac599 100644 --- a/Framework/Core/src/ResourcePolicyHelpers.cxx +++ b/Framework/Core/src/ResourcePolicyHelpers.cxx @@ -36,7 +36,8 @@ ResourcePolicy ResourcePolicyHelpers::cpuBoundTask(char const* s, int requestedC [matcher = std::regex(s)](DeviceSpec const& spec) -> bool { return std::regex_match(spec.name, matcher); }, - [requestedCPUs](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { return accumulated.cpu >= requestedCPUs ? OfferScore::Enough : OfferScore::More; }}; + [requestedCPUs](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { return accumulated.cpu >= requestedCPUs ? OfferScore::Enough : OfferScore::More; }, + ComputingQuotaOffer{.cpu = requestedCPUs}}; } ResourcePolicy ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask(char const* s, int requestedSharedMemory, int requestedTimeslices) @@ -46,7 +47,7 @@ ResourcePolicy ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask(char cons [matcher = std::regex(s)](DeviceSpec const& spec) -> bool { return std::regex_match(spec.name, matcher); }, - [requestedSharedMemory, requestedTimeslices](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { + [requestedSharedMemory, requestedTimeslices](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { // If we have enough memory and not enough timeslices, // ignore further shared memory. if (accumulated.sharedMemory >= requestedSharedMemory && offer.timeslices == 0) { @@ -66,7 +67,8 @@ ResourcePolicy ResourcePolicyHelpers::rateLimitedSharedMemoryBoundTask(char cons return OfferScore::Enough; } // We need more resources - return OfferScore::More; }}; + return OfferScore::More; }, + ComputingQuotaOffer{.sharedMemory = requestedSharedMemory, .timeslices = requestedTimeslices}}; } ResourcePolicy ResourcePolicyHelpers::sharedMemoryBoundTask(char const* s, int requestedSharedMemory) @@ -76,11 +78,12 @@ ResourcePolicy ResourcePolicyHelpers::sharedMemoryBoundTask(char const* s, int r [matcher = std::regex(s)](DeviceSpec const& spec) -> bool { return std::regex_match(spec.name, matcher); }, - [requestedSharedMemory](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { + [requestedSharedMemory](ComputingQuotaOffer const& offer, ComputingQuotaOffer const& accumulated) -> OfferScore { if (offer.sharedMemory == 0) { return OfferScore::Unneeded; } - return (accumulated.sharedMemory + offer.sharedMemory)>= requestedSharedMemory ? OfferScore::Enough : OfferScore::More; }}; + return (accumulated.sharedMemory + offer.sharedMemory) >= requestedSharedMemory ? OfferScore::Enough : OfferScore::More; }, + ComputingQuotaOffer{.sharedMemory = requestedSharedMemory}}; } } // namespace o2::framework