Skip to content

Commit 09c6912

Browse files
committed
Updater: Clearing old cache items.
1 parent ffaaf45 commit 09c6912

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

docs/configuration/updater.md

+13-1
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,16 @@ update_batch_period: "5m"
2626
```
2727
2828
Try to find a balance between having batches run too often (some batches may execute empty,
29-
leading to unnecessary overhead) and too rarely.
29+
leading to unnecessary overhead) and too rarely.
30+
31+
Additional options are available for the updater module, but they are not required.
32+
They are used to configure when cache management runs and the maximum number of entries in the cache.
33+
These are the options with their default values:
34+
35+
```yaml
36+
cache_management_cron: { hour: "2", minute: "0", second: "0" } # (1)!
37+
cache_max_entries: 32 # (2)!
38+
```
39+
40+
1. `cache_management_cron` - a cron schedule for the cache management. See [CronExpression docs][dp3.common.config.CronExpression] for details.
41+
2. `cache_max_entries` - the maximum number of entries in the cache. If the number of cache items (counted for each individual period) exceeds this number, the oldest entries are removed.

dp3/core/updater.py

+29
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,16 @@ class UpdaterConfig(BaseModel, extra="forbid"):
3030
update_batch_cron: A CRON expression for the periodic update.
3131
update_batch_period: The period of the periodic update.
3232
Should equal to the period of `update_batch_cron`.
33+
cache_management_cron: A CRON expression for the cache management.
34+
cache_max_entries: The maximum number of finished cache entries per thread_id.
3335
"""
3436

3537
update_batch_cron: CronExpression
3638
update_batch_period: ParsedTimedelta
3739

40+
cache_management_cron: CronExpression = CronExpression(hour=2, minute=0, second=0)
41+
cache_max_entries: int = 32
42+
3843

3944
class UpdateThreadState(BaseModel, validate_assignment=True):
4045
"""
@@ -131,6 +136,27 @@ def upsert(self, state: UpdateThreadState) -> UpdateResult:
131136
}
132137
return self._cache.update_one(filter_dict, update=update_dict, upsert=True)
133138

139+
def register_management(
140+
self, scheduler: Scheduler, trigger: CronExpression, max_entries: int
141+
) -> int:
142+
"""Registers the cache management task with the scheduler."""
143+
return scheduler.register(
144+
self._manage_cache, func_args=[max_entries], **trigger.model_dump()
145+
)
146+
147+
def _manage_cache(self, max_entries: int):
148+
"""Removes the oldest finished cache entries if their count exceeds the limit."""
149+
counts_per_thread_id = defaultdict(int)
150+
to_delete = []
151+
152+
for state in self._cache.find({"type": "state", "finished": True}).sort("t_created", -1):
153+
state_obj = UpdateThreadState.model_validate(state)
154+
counts_per_thread_id[state_obj.thread_id] += 1
155+
if counts_per_thread_id[state_obj.thread_id] > max_entries:
156+
to_delete.append(state["_id"])
157+
158+
self._cache.delete_many({"_id": {"$in": to_delete}})
159+
134160
def _setup_cache_indexes(self):
135161
"""Sets up the indexes of the state cache."""
136162
self._cache.create_index("t_created", background=True)
@@ -163,6 +189,9 @@ def __init__(
163189

164190
# Get state cache
165191
self.cache = UpdaterCache(self.db.get_module_cache("Updater"))
192+
self.cache.register_management(
193+
scheduler, self.config.cache_management_cron, self.config.cache_max_entries
194+
)
166195

167196
self.update_thread_hooks = defaultdict(dict)
168197

0 commit comments

Comments
 (0)