diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 75117ca06..8eed2baa8 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ === Enhancements * Track memory used in the hierarchical results normalizer. (See {ml-pull}2831[#2831].) +* Improve adherence to memory limits for the bucket gatherer. (See {ml-pull}2848[#2848].) === Bug Fixes diff --git a/include/model/CBucketGatherer.h b/include/model/CBucketGatherer.h index 82000ea0d..6af07e4fd 100644 --- a/include/model/CBucketGatherer.h +++ b/include/model/CBucketGatherer.h @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -28,7 +27,6 @@ #include #include -#include #include #include #include @@ -172,6 +170,9 @@ class MODEL_EXPORT CBucketGatherer { //! redundant except to create a signature that will not be mistaken for //! a general purpose copy constructor. CBucketGatherer(bool isForPersistence, const CBucketGatherer& other); + static bool isRecordIncomplete(const CEventData& data); + bool hasValidPersonAndAttributeIds(std::size_t pid, std::size_t cid) const; + bool handleExplicitNull(const CEventData& data, core_t::TTime time, TSizeSizePr pidCid); virtual ~CBucketGatherer() = default; //@} @@ -238,7 +239,7 @@ class MODEL_EXPORT CBucketGatherer { CResourceMonitor& resourceMonitor) = 0; //! Record the arrival of \p data at \p time. - bool addEventData(CEventData& data); + bool addEventData(const CEventData& data, const CResourceMonitor& resourceMonitor); //! Roll time forwards to \p time. void timeNow(core_t::TTime time); diff --git a/lib/model/CBucketGatherer.cc b/lib/model/CBucketGatherer.cc index 1c59ddd44..7bf77b270 100644 --- a/lib/model/CBucketGatherer.cc +++ b/lib/model/CBucketGatherer.cc @@ -23,6 +23,7 @@ #include #include +#include #include @@ -233,7 +234,36 @@ CBucketGatherer::CBucketGatherer(bool isForPersistence, const CBucketGatherer& o } } -bool CBucketGatherer::addEventData(CEventData& data) { +bool CBucketGatherer::isRecordIncomplete(const CEventData& data) { + return !data.personId() || !data.attributeId() || !data.count(); +} +bool CBucketGatherer::hasValidPersonAndAttributeIds(std::size_t const pid, + std::size_t const cid) const { + // Has the person/attribute been deleted from the gatherer? + if (!m_DataGatherer.isPersonActive(pid)) { + LOG_DEBUG(<< "Not adding value for deleted person " << pid); + return false; + } + if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) { + LOG_DEBUG(<< "Not adding value for deleted attribute " << cid); + return false; + } + return true; +} +bool CBucketGatherer::handleExplicitNull(const CEventData& data, + core_t::TTime const time, + CBucketGatherer::TSizeSizePr const pidCid) { + // If record is explicit null just note that a null record has been seen + // for the given (pid, cid) pair. + if (data.isExplicitNull()) { + TSizeSizePrUSet& bucketExplicitNulls = m_PersonAttributeExplicitNulls.get(time); + bucketExplicitNulls.insert(pidCid); + return true; + } + return false; +} +bool CBucketGatherer::addEventData(const CEventData& data, + const CResourceMonitor& resourceMonitor) { core_t::TTime const time = data.time(); if (time < this->earliestBucketStartTime()) { @@ -245,70 +275,62 @@ bool CBucketGatherer::addEventData(CEventData& data) { this->timeNow(time); - if (!data.personId() || !data.attributeId() || !data.count()) { - // The record was incomplete. + if (isRecordIncomplete(data)) { return false; } std::size_t const pid = *data.personId(); std::size_t const cid = *data.attributeId(); std::size_t const count = *data.count(); - if ((pid != CDynamicStringIdRegistry::INVALID_ID) && - (cid != CDynamicStringIdRegistry::INVALID_ID)) { - // Has the person/attribute been deleted from the gatherer? - if (!m_DataGatherer.isPersonActive(pid)) { - LOG_DEBUG(<< "Not adding value for deleted person " << pid); - return false; - } - if (m_DataGatherer.isPopulation() && !m_DataGatherer.isAttributeActive(cid)) { - LOG_DEBUG(<< "Not adding value for deleted attribute " << cid); - return false; - } - TSizeSizePr const pidCid = std::make_pair(pid, cid); + if ((pid == CDynamicStringIdRegistry::INVALID_ID) || + (cid == CDynamicStringIdRegistry::INVALID_ID)) { + return true; + } + + if (hasValidPersonAndAttributeIds(pid, cid) == false) { + return false; + } - // If record is explicit null just note that a null record has been seen - // for the given (pid, cid) pair. - if (data.isExplicitNull()) { - TSizeSizePrUSet& bucketExplicitNulls = - m_PersonAttributeExplicitNulls.get(time); - bucketExplicitNulls.insert(pidCid); - return true; - } + TSizeSizePr const pidCid = std::make_pair(pid, cid); - TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time); - if (count > 0) { - bucketCounts[pidCid] += count; - } + if (handleExplicitNull(data, time, pidCid)) { + return true; + } - const CEventData::TOptionalStrVec& influences = data.influences(); - auto& influencerCounts = m_InfluencerCounts.get(time); - if (influences.size() != influencerCounts.size()) { - LOG_ERROR(<< "Unexpected influences: " << influences << " expected " - << core::CContainerPrinter::print(this->beginInfluencers(), - this->endInfluencers())); - return false; - } + TSizeSizePrUInt64UMap& bucketCounts = m_PersonAttributeCounts.get(time); + if (count > 0) { + bucketCounts[pidCid] += count; + } - TOptionalStrVec canonicalInfluences(influencerCounts.size()); - for (std::size_t i = 0; i < influences.size(); ++i) { - const CEventData::TOptionalStr& influence = influences[i]; - if (influence) { - const std::string& inf = *influence; - canonicalInfluences[i] = inf; - if (count > 0) { - influencerCounts[i] - .emplace(boost::unordered::piecewise_construct, - boost::make_tuple(pidCid, inf), - boost::make_tuple(static_cast(0))) - .first->second += count; - } + const CEventData::TOptionalStrVec& influences = data.influences(); + auto& influencerCounts = m_InfluencerCounts.get(time); + if (influences.size() != influencerCounts.size()) { + LOG_ERROR(<< "Unexpected influences: " << influences << " expected " + << core::CContainerPrinter::print(this->beginInfluencers(), + this->endInfluencers())); + return false; + } + + TOptionalStrVec canonicalInfluences(influencerCounts.size()); + auto updateInfluencer = [&](std::size_t i) { + if (const CEventData::TOptionalStr& influence = influences[i]) { + const std::string& inf = *influence; + canonicalInfluences[i] = inf; + if (count > 0 && resourceMonitor.areAllocationsAllowed()) { + influencerCounts[i] + .emplace(boost::unordered::piecewise_construct, + boost::make_tuple(pidCid, inf), + boost::make_tuple(static_cast(0))) + .first->second += count; } } - - this->addValue(pid, cid, time, data.values(), count, data.stringValue(), - canonicalInfluences); + }; + for (std::size_t i = 0; i < influences.size(); ++i) { + updateInfluencer(i); } + + this->addValue(pid, cid, time, data.values(), count, data.stringValue(), canonicalInfluences); return true; } diff --git a/lib/model/CDataGatherer.cc b/lib/model/CDataGatherer.cc index dffe6efb8..8699d508a 100644 --- a/lib/model/CDataGatherer.cc +++ b/lib/model/CDataGatherer.cc @@ -304,7 +304,7 @@ bool CDataGatherer::addArrival(const TStrCPtrVec& fieldValues, return false; } - return m_BucketGatherer->addEventData(data); + return m_BucketGatherer->addEventData(data, resourceMonitor); } void CDataGatherer::sampleNow(core_t::TTime sampleBucketStart) {