Skip to content

Commit e0cde8e

Browse files
committed
HistoryManager: Customizable aggregation window.
As any cron expression is allowed, the retroactive, per-day batching implementation was dropped. Implemented option of not having an archive.
1 parent d8b8531 commit e0cde8e

File tree

1 file changed

+23
-33
lines changed

1 file changed

+23
-33
lines changed

dp3/history_management/history_manager.py

+23-33
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,10 @@ def __init__(
111111
# Schedule datapoint archivation
112112
archive_config = self.config.datapoint_archivation
113113
self.keep_raw_delta = archive_config.older_than
114-
# TODO: Handle None as valid archive dir (simply delete datapoints)
115-
self.log_dir = self._ensure_log_dir(archive_config.archive_dir)
114+
if archive_config.archive_dir is not None:
115+
self.log_dir = self._ensure_log_dir(archive_config.archive_dir)
116+
else:
117+
self.log_dir = None
116118
registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.dict())
117119

118120
def delete_old_dps(self):
@@ -157,13 +159,9 @@ def archive_old_dps(self):
157159
Archives old data points from raw collection.
158160
159161
Updates already saved archive files, if present.
160-
161-
TODO: FIX archive file naming and generalize for shorter archivation windows
162-
Currently will overwrite existing archive files if run more than once a day.
163162
"""
164163

165164
t_old = datetime.utcnow() - self.keep_raw_delta
166-
t_old = t_old.replace(hour=0, minute=0, second=0, microsecond=0)
167165
self.log.debug("Archiving all records before %s ...", t_old)
168166

169167
max_date, min_date, total_dps = self._get_raw_dps_summary(t_old)
@@ -174,18 +172,19 @@ def archive_old_dps(self):
174172
"Found %s datapoints to archive in the range %s - %s", total_dps, min_date, max_date
175173
)
176174

177-
n_days = (max_date - min_date).days + 1
178-
for date, next_date in [
179-
(min_date + timedelta(days=n), min_date + timedelta(days=n + 1)) for n in range(n_days)
180-
]:
181-
date_string = date.strftime("%Y%m%d")
182-
day_datapoints = 0
183-
date_logfile = self.log_dir / f"dp-log-{date_string}.json"
175+
if self.log_dir is None:
176+
self.log.debug("No archive directory specified, skipping archivation.")
177+
else:
178+
min_date_string = min_date.strftime("%Y%m%dT%H%M%S")
179+
max_date_string = max_date.strftime("%Y%m%dT%H%M%S")
180+
date_logfile = self.log_dir / f"dp-log-{min_date_string}--{max_date_string}.json"
181+
datapoints = 0
184182

185183
with open(date_logfile, "w", encoding="utf-8") as logfile:
186184
first = True
185+
187186
for etype in self.model_spec.entities:
188-
result_cursor = self.db.get_raw(etype, after=date, before=next_date)
187+
result_cursor = self.db.get_raw(etype, after=min_date, before=t_old)
189188
for dp in result_cursor:
190189
if first:
191190
logfile.write(
@@ -196,23 +195,18 @@ def archive_old_dps(self):
196195
logfile.write(
197196
f",\n{json.dumps(self._reformat_dp(dp), cls=DatetimeEncoder)}"
198197
)
199-
day_datapoints += 1
198+
datapoints += 1
200199
logfile.write("\n]")
201-
self.log.debug(
202-
"%s: Archived %s datapoints to %s", date_string, day_datapoints, date_logfile
203-
)
200+
self.log.info("Archived %s datapoints to %s", datapoints, date_logfile)
204201
compress_file(date_logfile)
205202
os.remove(date_logfile)
206-
self.log.debug("%s: Saved archive was compressed", date_string)
203+
self.log.debug("Saved archive was compressed")
207204

208-
if not day_datapoints:
209-
continue
210-
211-
deleted_count = 0
212-
for etype in self.model_spec.entities:
213-
deleted_res = self.db.delete_old_raw_dps(etype, next_date)
214-
deleted_count += deleted_res.deleted_count
215-
self.log.debug("%s: Deleted %s datapoints", date_string, deleted_count)
205+
deleted_count = 0
206+
for etype in self.model_spec.entities:
207+
deleted_res = self.db.delete_old_raw_dps(etype, before=t_old)
208+
deleted_count += deleted_res.deleted_count
209+
self.log.info("Deleted %s datapoints", deleted_count)
216210

217211
@staticmethod
218212
def _reformat_dp(dp):
@@ -233,12 +227,8 @@ def _get_raw_dps_summary(
233227
date_ranges.append(range_summary)
234228
if not date_ranges:
235229
return None, None, 0
236-
min_date = min(x["earliest"] for x in date_ranges).replace(
237-
hour=0, minute=0, second=0, microsecond=0
238-
)
239-
max_date = max(x["latest"] for x in date_ranges).replace(
240-
hour=0, minute=0, second=0, microsecond=0
241-
)
230+
min_date = min(x["earliest"] for x in date_ranges)
231+
max_date = max(x["latest"] for x in date_ranges)
242232
total_dps = sum(x["count"] for x in date_ranges)
243233
return max_date, min_date, total_dps
244234

0 commit comments

Comments
 (0)