From 729395ccffc8c9bcc8afa19894e6960a991ae191 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:19:19 +0200 Subject: [PATCH 1/5] add resource monitoring in addEventData --- docs/CHANGELOG.asciidoc | 1 + include/model/CBucketGatherer.h | 2 +- lib/model/CBucketGatherer.cc | 5 +++-- lib/model/CDataGatherer.cc | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 75117ca066..ff1e264d6e 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ === Enhancements * Track memory used in the hierarchical results normalizer. (See {ml-pull}2831[#2831].) +* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2831[#2831].) === Bug Fixes diff --git a/include/model/CBucketGatherer.h b/include/model/CBucketGatherer.h index 24c4256733..383c61bb72 100644 --- a/include/model/CBucketGatherer.h +++ b/include/model/CBucketGatherer.h @@ -220,7 +220,7 @@ class MODEL_EXPORT CBucketGatherer { CResourceMonitor& resourceMonitor) = 0; //! Record the arrival of \p data at \p time. - bool addEventData(CEventData& data); + bool addEventData(CEventData& data, CResourceMonitor& resourceMonitor); //! Roll time forwards to \p time. void timeNow(core_t::TTime time); diff --git a/lib/model/CBucketGatherer.cc b/lib/model/CBucketGatherer.cc index 7a987ec6ae..90d0f08225 100644 --- a/lib/model/CBucketGatherer.cc +++ b/lib/model/CBucketGatherer.cc @@ -28,6 +28,7 @@ #include #include +#include namespace ml { namespace model { @@ -230,7 +231,7 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o } } -bool CBucketGatherer::addEventData(CEventData& data) { +bool CBucketGatherer::addEventData(CEventData& data, CResourceMonitor& resourceMonitor) { core_t::TTime time = data.time(); if (time < this->earliestBucketStartTime()) { @@ -293,7 +294,7 @@ bool CBucketGatherer::addEventData(CEventData& data) { if (influence) { const std::string& inf = *influence; canonicalInfluences[i] = inf; - if (count > 0) { + if (count > 0 && resourceMonitor.areAllocationsAllowed()) { influencerCounts[i] .emplace(boost::unordered::piecewise_construct, boost::make_tuple(pidCid, inf), diff --git a/lib/model/CDataGatherer.cc b/lib/model/CDataGatherer.cc index d224feb9aa..58d40791dd 100644 --- a/lib/model/CDataGatherer.cc +++ b/lib/model/CDataGatherer.cc @@ -316,7 +316,7 @@ bool CDataGatherer::addArrival(const TStrCPtrVec& fieldValues, return false; } - return m_BucketGatherer->addEventData(data); + return m_BucketGatherer->addEventData(data, resourceMonitor); } void CDataGatherer::sampleNow(core_t::TTime sampleBucketStart) { From 108f3cdf8c830405711d9c5bd982c8c484c4df86 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:21:38 +0200 Subject: [PATCH 2/5] update PR number --- docs/CHANGELOG.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index ff1e264d6e..8eed2baa88 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -33,7 +33,7 @@ === Enhancements * Track memory used in the hierarchical results normalizer. (See {ml-pull}2831[#2831].) -* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2831[#2831].) +* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2848[#2848].) === Bug Fixes From e642db4abd0018ade1bd9dc9e6953110d5045a50 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Mon, 14 Apr 2025 10:47:44 +0200 Subject: [PATCH 3/5] linting --- include/model/CBucketGatherer.h | 3 ++- lib/model/CBucketGatherer.cc | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/include/model/CBucketGatherer.h b/include/model/CBucketGatherer.h index 383c61bb72..6ecdcb25a7 100644 --- a/include/model/CBucketGatherer.h +++ b/include/model/CBucketGatherer.h @@ -154,6 +154,7 @@ class MODEL_EXPORT CBucketGatherer { //! redundant except to create a signature that will not be mistaken for //! a general purpose copy constructor. CBucketGatherer(bool isForPersistence, const CBucketGatherer& other); + bool isRecordIncomplete(CEventData& data); virtual ~CBucketGatherer() = default; //@} @@ -220,7 +221,7 @@ class MODEL_EXPORT CBucketGatherer { CResourceMonitor& resourceMonitor) = 0; //! Record the arrival of \p data at \p time. - bool addEventData(CEventData& data, CResourceMonitor& resourceMonitor); + bool addEventData(CEventData& data, const CResourceMonitor& resourceMonitor); //! Roll time forwards to \p time. void timeNow(core_t::TTime time); diff --git a/lib/model/CBucketGatherer.cc b/lib/model/CBucketGatherer.cc index 90d0f08225..b67ed224fb 100644 --- a/lib/model/CBucketGatherer.cc +++ b/lib/model/CBucketGatherer.cc @@ -231,7 +231,10 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o } } -bool CBucketGatherer::addEventData(CEventData& data, CResourceMonitor& resourceMonitor) { +bool CBucketGatherer::isRecordIncomplete(CEventData& data) { + return !data.personId() || !data.attributeId() || !data.count(); +} +bool CBucketGatherer::addEventData(CEventData& data, const CResourceMonitor& resourceMonitor) { core_t::TTime time = data.time(); if (time < this->earliestBucketStartTime()) { @@ -243,7 +246,7 @@ bool CBucketGatherer::addEventData(CEventData& data, CResourceMonitor& resourceM this->timeNow(time); - if (!data.personId() || !data.attributeId() || !data.count()) { + if (isRecordIncomplete(data)) { // The record was incomplete. return false; } From 9c0b3d7af09e1f210f637cfede127d28b7733053 Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Mon, 14 Apr 2025 11:41:06 +0200 Subject: [PATCH 4/5] linting --- include/model/CBucketGatherer.h | 6 +- lib/model/CBucketGatherer.cc | 120 ++++++++++++++++++-------------- 2 files changed, 73 insertions(+), 53 deletions(-) diff --git a/include/model/CBucketGatherer.h b/include/model/CBucketGatherer.h index 54a91eafd3..4313f8b4b9 100644 --- a/include/model/CBucketGatherer.h +++ b/include/model/CBucketGatherer.h @@ -172,7 +172,9 @@ class MODEL_EXPORT CBucketGatherer { //! redundant except to create a signature that will not be mistaken for //! a general purpose copy constructor. CBucketGatherer(bool isForPersistence, const CBucketGatherer& other); - bool isRecordIncomplete(CEventData& data); + static bool isRecordIncomplete(const CEventData& data); + bool hasValidPersonAndAttributeIds(std::size_t pid, std::size_t cid) const; + bool handleExplicitNull(const CEventData& data, core_t::TTime time, TSizeSizePr pidCid); virtual ~CBucketGatherer() = default; //@} @@ -239,7 +241,7 @@ class MODEL_EXPORT CBucketGatherer { CResourceMonitor& resourceMonitor) = 0; //! Record the arrival of \p data at \p time. - bool addEventData(CEventData& data, const CResourceMonitor& resourceMonitor); + bool addEventData(const CEventData& data, const CResourceMonitor& resourceMonitor); //! Roll time forwards to \p time. void timeNow(core_t::TTime time); diff --git a/lib/model/CBucketGatherer.cc b/lib/model/CBucketGatherer.cc index 2a122cbf18..7bf77b2704 100644 --- a/lib/model/CBucketGatherer.cc +++ b/lib/model/CBucketGatherer.cc @@ -234,10 +234,36 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o } } -bool CBucketGatherer::isRecordIncomplete(CEventData& data) { +bool CBucketGatherer::isRecordIncomplete(const CEventData& data) { return !data.personId() || !data.attributeId() || !data.count(); } -bool CBucketGatherer::addEventData(CEventData& data, const CResourceMonitor& resourceMonitor) { +bool CBucketGatherer::hasValidPersonAndAttributeIds(std::size_t const pid, + std::size_t const cid) const { + // Has the person/attribute been deleted from the gatherer? + if (!m_DataGatherer.isPersonActive(pid)) { + LOG_DEBUG(<< "Not adding value for deleted person " << pid); + return false; + } + if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) { + LOG_DEBUG(<< "Not adding value for deleted attribute " << cid); + return false; + } + return true; +} +bool CBucketGatherer::handleExplicitNull(const CEventData& data, + core_t::TTime const time, + CBucketGatherer::TSizeSizePr const pidCid) { + // If record is explicit null just note that a null record has been seen + // for the given (pid, cid) pair. + if (data.isExplicitNull()) { + TSizeSizePrUSet& bucketExplicitNulls = m_PersonAttributeExplicitNulls.get(time); + bucketExplicitNulls.insert(pidCid); + return true; + } + return false; +} +bool CBucketGatherer::addEventData(const CEventData& data, + const CResourceMonitor& resourceMonitor) { core_t::TTime const time = data.time(); if (time < this->earliestBucketStartTime()) { @@ -250,69 +276,61 @@ bool CBucketGatherer::addEventData(CEventData& data, const CResourceMonitor& res this->timeNow(time); if (isRecordIncomplete(data)) { - // The record was incomplete. return false; } std::size_t const pid = *data.personId(); std::size_t const cid = *data.attributeId(); std::size_t const count = *data.count(); - if ((pid != CDynamicStringIdRegistry::INVALID_ID) && - (cid != CDynamicStringIdRegistry::INVALID_ID)) { - // Has the person/attribute been deleted from the gatherer? - if (!m_DataGatherer.isPersonActive(pid)) { - LOG_DEBUG(<< "Not adding value for deleted person " << pid); - return false; - } - if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) { - LOG_DEBUG(<< "Not adding value for deleted attribute " << cid); - return false; - } - TSizeSizePr const pidCid = std::make_pair(pid, cid); + if ((pid == CDynamicStringIdRegistry::INVALID_ID) || + (cid == CDynamicStringIdRegistry::INVALID_ID)) { + return true; + } - // If record is explicit null just note that a null record has been seen - // for the given (pid, cid) pair. - if (data.isExplicitNull()) { - TSizeSizePrUSet& bucketExplicitNulls = - m_PersonAttributeExplicitNulls.get(time); - bucketExplicitNulls.insert(pidCid); - return true; - } + if (hasValidPersonAndAttributeIds(pid, cid) == false) { + return false; + } - TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time); - if (count > 0) { - bucketCounts[pidCid] += count; - } + TSizeSizePr const pidCid = std::make_pair(pid, cid); - const CEventData::TOptionalStrVec& influences = data.influences(); - auto& influencerCounts = m_InfluencerCounts.get(time); - if (influences.size() != influencerCounts.size()) { - LOG_ERROR(<< "Unexpected influences: " << influences << " expected " - << core::CContainerPrinter::print(this->beginInfluencers(), - this->endInfluencers())); - return false; - } + if (handleExplicitNull(data, time, pidCid)) { + return true; + } - TOptionalStrVec canonicalInfluences(influencerCounts.size()); - for (std::size_t i = 0; i < influences.size(); ++i) { - const CEventData::TOptionalStr& influence = influences[i]; - if (influence) { - const std::string& inf = *influence; - canonicalInfluences[i] = inf; - if (count > 0 && resourceMonitor.areAllocationsAllowed()) { - influencerCounts[i] - .emplace(boost::unordered::piecewise_construct, - boost::make_tuple(pidCid, inf), - boost::make_tuple(static_cast(0))) - .first->second += count; - } + TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time); + if (count > 0) { + bucketCounts[pidCid] += count; + } + + const CEventData::TOptionalStrVec& influences = data.influences(); + auto& influencerCounts = m_InfluencerCounts.get(time); + if (influences.size() != influencerCounts.size()) { + LOG_ERROR(<< "Unexpected influences: " << influences << " expected " + << core::CContainerPrinter::print(this->beginInfluencers(), + this->endInfluencers())); + return false; + } + + TOptionalStrVec canonicalInfluences(influencerCounts.size()); + auto updateInfluencer = [&](std::size_t i) { + if (const CEventData::TOptionalStr& influence = influences[i]) { + const std::string& inf = *influence; + canonicalInfluences[i] = inf; + if (count > 0 && resourceMonitor.areAllocationsAllowed()) { + influencerCounts[i] + .emplace(boost::unordered::piecewise_construct, + boost::make_tuple(pidCid, inf), + boost::make_tuple(static_cast(0))) + .first->second += count; } } - - this->addValue(pid, cid, time, data.values(), count, data.stringValue(), - canonicalInfluences); + }; + for (std::size_t i = 0; i < influences.size(); ++i) { + updateInfluencer(i); } + + this->addValue(pid, cid, time, data.values(), count, data.stringValue(), canonicalInfluences); return true; } From 9220adaa8d5ab9b3c547a06508164a2b095221bc Mon Sep 17 00:00:00 2001 From: Valeriy Khakhutskyy <1292899+valeriy42@users.noreply.github.com> Date: Mon, 14 Apr 2025 11:43:04 +0200 Subject: [PATCH 5/5] remove unused includes --- include/model/CBucketGatherer.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/model/CBucketGatherer.h b/include/model/CBucketGatherer.h index 4313f8b4b9..6af07e4fdf 100644 --- a/include/model/CBucketGatherer.h +++ b/include/model/CBucketGatherer.h @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -28,7 +27,6 @@ #include #include -#include #include #include #include