Skip to content

Commit bbdce4b

Browse files
authored
ENG-447 Adding last sync to sync task return value (#216)
* ENG-447 Adding last sync to sync task return value Also incorporated updated documentation (ENG-443)
1 parent 9504ec5 commit bbdce4b

File tree

4 files changed

+144
-19
lines changed

4 files changed

+144
-19
lines changed
File renamed without changes.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Sync information
2+
3+
The `sync_info` table is meant to always be accessed through one of these two functions: `propose_sync_task` and `end_sync_task`.
4+
This acts as a semaphore, so that two workers (e.g. the roam plugin on two different browsers) do not try to run the same sync task at the same time. So you need to give the function `propose_sync_task` enough information to distinguish what you mean to do:
5+
6+
1. The `target`, e.g. the database Id of the scope of the task, usually a space, but it could be a single content or concept (for reactive updates)
7+
2. a `function` name, to distinguish different tasks on the same target; e.g. adding vs deleting content. (arbitrary short string)
8+
3. the `worker` name: random string, should be the same between calls.
9+
10+
Further, you may specify the `timeout` (>= 1s) after which the task should be deemed to have failed. The `task_interval` (>=5s) which is how often to do the task. (This must be longer than the `timeout`.)
11+
12+
When a worker calls `propose_sync_task`, it will receive either:
13+
14+
1. a timestamp in the future, meaning that the task is already being run by another worker, or has been run more recently than task_interval, and this worker should not attempt to run this task again before the given timestamp;
15+
2. a timestamp in the past, which is also the last time the task was executed successfully. Your worker can ask the platform for all changes posterior to that time. (This will only be reliable if you make always make your queries after calling this function!)
16+
3. Null, meaning the task was not executed successfully before, and your worker is tasked with starting from scratch.
17+
18+
When a worker finishes the task, it should clean up with `end_sync_task`, giving the same identifying arguments and a status ('complete' or 'failed'.)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
DROP FUNCTION IF EXISTS public.propose_sync_task;
2+
3+
CREATE OR REPLACE FUNCTION public.propose_sync_task(
4+
s_target bigint,
5+
s_function character varying,
6+
s_worker character varying,
7+
"timeout" interval,
8+
"task_interval" interval
9+
) RETURNS timestamp with time zone
10+
LANGUAGE plpgsql
11+
AS $$
12+
DECLARE s_id INTEGER;
13+
DECLARE start_time TIMESTAMP WITH TIME ZONE := now();
14+
DECLARE t_status task_status;
15+
DECLARE t_failure_count SMALLINT;
16+
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
17+
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
18+
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
19+
DECLARE result TIMESTAMP WITH TIME ZONE;
20+
BEGIN
21+
ASSERT timeout * 2 < task_interval;
22+
ASSERT timeout >= '1s'::interval;
23+
ASSERT task_interval >= '5s'::interval;
24+
INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at)
25+
VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout)
26+
ON CONFLICT (sync_target, sync_function) DO NOTHING
27+
RETURNING id INTO s_id;
28+
IF s_id IS NOT NULL THEN
29+
-- totally new_row, I'm on the task
30+
-- return last time it was run successfully
31+
SELECT max(last_task_start) INTO result FROM public.sync_info
32+
WHERE sync_target = s_target
33+
AND sync_function = s_function
34+
AND status = 'complete';
35+
RETURN result;
36+
END IF;
37+
-- now we know it pre-existed. Maybe already active.
38+
SELECT id INTO STRICT s_id FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
39+
PERFORM pg_advisory_lock(s_id);
40+
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at
41+
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at
42+
FROM public.sync_info
43+
WHERE id = s_id;
44+
45+
IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN
46+
t_status := 'timeout';
47+
t_failure_count := t_failure_count + 1;
48+
END IF;
49+
-- basic backoff
50+
task_interval := task_interval * (1+t_failure_count);
51+
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
52+
-- we are ready to take on the task
53+
UPDATE public.sync_info
54+
SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = start_time, failure_count=t_failure_count
55+
WHERE id=s_id;
56+
ELSE
57+
-- the task has been tried recently enough
58+
IF t_status = 'timeout' THEN
59+
UPDATE public.sync_info
60+
SET status=t_status, failure_count=t_failure_count
61+
WHERE id=s_id;
62+
END IF;
63+
result := coalesce(t_last_task_end, t_last_task_start) + task_interval;
64+
END IF;
65+
66+
PERFORM pg_advisory_unlock(s_id);
67+
RETURN result;
68+
END;
69+
$$;
70+
71+
CREATE OR REPLACE FUNCTION public.end_sync_task(
72+
s_target bigint,
73+
s_function character varying,
74+
s_worker character varying,
75+
s_status public.task_status
76+
) RETURNS void
77+
LANGUAGE plpgsql
78+
AS $$
79+
DECLARE t_id INTEGER;
80+
DECLARE t_worker varchar;
81+
DECLARE t_status task_status;
82+
DECLARE t_failure_count SMALLINT;
83+
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
84+
BEGIN
85+
SELECT id, worker, status, failure_count, last_task_end
86+
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end
87+
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
88+
ASSERT s_status > 'active';
89+
ASSERT t_worker = s_worker, 'Wrong worker';
90+
ASSERT s_status >= t_status, 'do not go back in status';
91+
IF s_status = 'complete' THEN
92+
t_last_task_end := now();
93+
t_failure_count := 0;
94+
ELSE
95+
IF t_status != s_status THEN
96+
t_failure_count := t_failure_count + 1;
97+
END IF;
98+
END IF;
99+
100+
UPDATE public.sync_info
101+
SET status = s_status,
102+
task_times_out_at=null,
103+
last_task_end=t_last_task_end,
104+
failure_count=t_failure_count
105+
WHERE id=t_id;
106+
END;
107+
$$;

packages/database/supabase/schemas/sync.sql

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@ CREATE UNIQUE INDEX sync_info_u_idx ON public.sync_info USING btree (
4444
"sync_target", sync_function
4545
);
4646

47-
set search_path to public, extensions ;
48-
49-
5047
CREATE OR REPLACE FUNCTION public.end_sync_task(
5148
s_target bigint,
5249
s_function character varying,
@@ -63,7 +60,7 @@ DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
6360
BEGIN
6461
SELECT id, worker, status, failure_count, last_task_end
6562
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_end
66-
FROM sync_info WHERE sync_target = s_target AND sync_function = s_function;
63+
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
6764
ASSERT s_status > 'active';
6865
ASSERT t_worker = s_worker, 'Wrong worker';
6966
ASSERT s_status >= t_status, 'do not go back in status';
@@ -76,7 +73,7 @@ BEGIN
7673
END IF;
7774
END IF;
7875

79-
UPDATE sync_info
76+
UPDATE public.sync_info
8077
SET status = s_status,
8178
task_times_out_at=null,
8279
last_task_end=t_last_task_end,
@@ -99,37 +96,40 @@ CREATE OR REPLACE FUNCTION public.propose_sync_task(
9996
s_worker character varying,
10097
"timeout" interval,
10198
"task_interval" interval
102-
) RETURNS interval
99+
) RETURNS timestamp with time zone
103100
LANGUAGE plpgsql
104101
AS $$
105102
DECLARE s_id INTEGER;
106-
DECLARE start_time TIMESTAMP WITH TIME ZONE;
103+
DECLARE start_time TIMESTAMP WITH TIME ZONE := now();
107104
DECLARE t_status task_status;
108105
DECLARE t_failure_count SMALLINT;
109106
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
110107
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
111108
DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE;
112-
DECLARE result INTERVAL = NULL;
109+
DECLARE result TIMESTAMP WITH TIME ZONE;
113110
BEGIN
114111
ASSERT timeout * 2 < task_interval;
115112
ASSERT timeout >= '1s'::interval;
116113
ASSERT task_interval >= '5s'::interval;
117-
start_time := now();
118-
INSERT INTO sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at)
114+
INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at)
119115
VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout)
120-
ON CONFLICT DO NOTHING
116+
ON CONFLICT (sync_target, sync_function) DO NOTHING
121117
RETURNING id INTO s_id;
122-
-- zut il renvoie null...
123118
IF s_id IS NOT NULL THEN
124119
-- totally new_row, I'm on the task
125-
RETURN NULL;
120+
-- return last time it was run successfully
121+
SELECT max(last_task_start) INTO result FROM public.sync_info
122+
WHERE sync_target = s_target
123+
AND sync_function = s_function
124+
AND status = 'complete';
125+
RETURN result;
126126
END IF;
127127
-- now we know it pre-existed. Maybe already active.
128-
SELECT id INTO STRICT s_id FROM sync_info WHERE sync_target = s_target AND sync_function = s_function;
128+
SELECT id INTO STRICT s_id FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
129129
PERFORM pg_advisory_lock(s_id);
130130
SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at
131131
INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at
132-
FROM sync_info
132+
FROM public.sync_info
133133
WHERE id = s_id;
134134

135135
IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN
@@ -140,17 +140,17 @@ BEGIN
140140
task_interval := task_interval * (1+t_failure_count);
141141
IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN
142142
-- we are ready to take on the task
143-
UPDATE sync_info
144-
SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = now(), failure_count=t_failure_count
143+
UPDATE public.sync_info
144+
SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = start_time, failure_count=t_failure_count
145145
WHERE id=s_id;
146146
ELSE
147147
-- the task has been tried recently enough
148148
IF t_status = 'timeout' THEN
149-
UPDATE sync_info
149+
UPDATE public.sync_info
150150
SET status=t_status, failure_count=t_failure_count
151151
WHERE id=s_id;
152152
END IF;
153-
result := coalesce(t_last_task_end, t_last_task_start) + task_interval - now();
153+
result := coalesce(t_last_task_end, t_last_task_start) + task_interval;
154154
END IF;
155155

156156
PERFORM pg_advisory_unlock(s_id);

0 commit comments

Comments
 (0)