-
-
Notifications
You must be signed in to change notification settings - Fork 174
Expand file tree
/
Copy pathschedules.py
More file actions
238 lines (216 loc) · 7.67 KB
/
schedules.py
File metadata and controls
238 lines (216 loc) · 7.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Schedule store — recurring prompts fired without human intervention."""
import json
import re
import time
import threading
import uuid
from pathlib import Path
# Interval parsing: "every 30m", "every 1h", "every 2h", "daily at 09:00"
_INTERVAL_RE = re.compile(
r"every\s+(\d+)\s*(m|min|h|hr|d|day)s?\b",
re.IGNORECASE
)
_DAILY_RE = re.compile(
r"daily\s+at\s+(\d{1,2}):(\d{2})\b",
re.IGNORECASE
)
def parse_schedule_spec(spec: str) -> tuple[int | None, str | None]:
"""Parse natural-language schedule spec.
Returns (interval_seconds, None) for intervals, or (None, cron_expr) for daily.
Returns (None, None) if unparseable.
"""
spec = spec.strip()
m = _INTERVAL_RE.search(spec)
if m:
val = int(m.group(1))
unit = m.group(2).lower()
if unit in ("m", "min"):
secs = val * 60
elif unit in ("h", "hr"):
secs = val * 3600
elif unit in ("d", "day"):
secs = val * 86400
else:
return (None, None)
if secs < 60:
secs = 60 # minimum 1 minute
return (secs, None)
dm = _DAILY_RE.search(spec)
if dm:
hour = int(dm.group(1)) % 24
minute = int(dm.group(2)) % 60
# Store as "HH:MM" for daily; we compute next_run from current time
return (86400, f"{hour:02d}:{minute:02d}") # interval=1 day, time hint
return (None, None)
def compute_next_run(
interval_seconds: int,
last_run: float | None,
daily_at: str | None = None,
) -> float:
"""Compute next run timestamp. daily_at is "HH:MM" for daily schedules."""
now = time.time()
if last_run is None:
if daily_at:
# First run: today at HH:MM, or tomorrow if already past
parts = daily_at.split(":")
hour, minute = int(parts[0]), int(parts[1])
from datetime import datetime
today = datetime.fromtimestamp(now)
target = today.replace(hour=hour, minute=minute, second=0, microsecond=0)
ts = target.timestamp()
if ts <= now:
from datetime import timedelta
target = target + timedelta(days=1)
ts = target.timestamp()
return ts
return now + interval_seconds
if daily_at:
from datetime import datetime, timedelta
last_dt = datetime.fromtimestamp(last_run)
parts = daily_at.split(":")
hour, minute = int(parts[0]), int(parts[1])
next_dt = last_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_dt <= last_dt:
next_dt = next_dt + timedelta(days=1)
return next_dt.timestamp()
return last_run + interval_seconds
class ScheduleStore:
def __init__(self, path: str):
self._path = Path(path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._schedules: list[dict] = []
self._lock = threading.Lock()
self._callbacks: list = []
self._load()
def _load(self):
if not self._path.exists():
return
try:
raw = json.loads(self._path.read_text("utf-8"))
if isinstance(raw, list):
self._schedules = raw
except (json.JSONDecodeError, TypeError):
self._schedules = []
def _save(self):
self._path.write_text(
json.dumps(self._schedules, indent=2, ensure_ascii=False) + "\n",
"utf-8",
)
def on_change(self, callback):
"""Register callback(action, schedule) on any change."""
self._callbacks.append(callback)
def _fire(self, action: str, schedule: dict):
for cb in self._callbacks:
try:
cb(action, schedule)
except Exception:
pass
def list_all(self, active_only: bool = False) -> list[dict]:
with self._lock:
result = list(self._schedules)
if active_only:
result = [s for s in result if s.get("active", True)]
return result
def get(self, schedule_id: str) -> dict | None:
with self._lock:
for s in self._schedules:
if s.get("id") == schedule_id:
return dict(s)
return None
def create(
self,
prompt: str,
targets: list[str],
channel: str = "general",
interval_seconds: int | None = None,
daily_at: str | None = None,
one_shot: bool = False,
send_at: float | None = None,
created_by: str = "user",
) -> dict:
"""Create a schedule. Either interval_seconds or daily_at must be set.
If one_shot=True, the schedule auto-deletes after firing once.
If send_at is provided (epoch), use it as next_run directly.
"""
schedule_id = str(uuid.uuid4())[:8]
now = time.time()
last_run = None
if daily_at:
interval_seconds = 86400
if send_at:
next_run = send_at
else:
next_run = compute_next_run(
interval_seconds or 86400,
last_run,
daily_at=daily_at,
)
with self._lock:
s = {
"id": schedule_id,
"prompt": prompt.strip()[:500],
"targets": [t.strip().lstrip("@") for t in targets if t.strip()],
"channel": channel or "general",
"interval_seconds": interval_seconds or 86400,
"daily_at": daily_at,
"next_run": next_run,
"created_at": now,
"last_run": None,
"active": True,
"one_shot": one_shot,
"created_by": created_by,
}
self._schedules.append(s)
self._save()
self._fire("create", s)
return dict(s)
def run_due(self) -> list[dict]:
"""Return list of schedules that are due (next_run <= now). Does not update."""
now = time.time()
with self._lock:
due = [s for s in self._schedules if s.get("active") and s.get("next_run", 0) <= now]
return [dict(s) for s in due]
def mark_run(self, schedule_id: str) -> dict | None:
"""Mark schedule as run, advance next_run. Returns updated schedule."""
with self._lock:
for s in self._schedules:
if s.get("id") != schedule_id:
continue
now = time.time()
s["last_run"] = now
s["next_run"] = compute_next_run(
s.get("interval_seconds", 3600),
now,
daily_at=s.get("daily_at"),
)
self._save()
result = dict(s)
break
else:
return None
self._fire("update", result)
return result
def delete(self, schedule_id: str) -> dict | None:
with self._lock:
for i, s in enumerate(self._schedules):
if s.get("id") == schedule_id:
removed = self._schedules.pop(i)
self._save()
result = dict(removed)
break
else:
return None
self._fire("delete", result)
return result
def toggle(self, schedule_id: str) -> dict | None:
with self._lock:
for s in self._schedules:
if s.get("id") == schedule_id:
s["active"] = not s.get("active", True)
self._save()
result = dict(s)
break
else:
return None
self._fire("update", result)
return result