|
| 1 | +CREATE OR REPLACE FUNCTION public.propose_sync_task( |
| 2 | + s_target bigint, |
| 3 | + s_function character varying, |
| 4 | + s_worker character varying, |
| 5 | + "timeout" interval, |
| 6 | + "task_interval" interval |
| 7 | +) RETURNS timestamp with time zone |
| 8 | +SET search_path = '' |
| 9 | +LANGUAGE plpgsql |
| 10 | +AS $$ |
| 11 | +DECLARE s_id INTEGER; |
| 12 | +DECLARE start_time TIMESTAMP WITH TIME ZONE := now(); |
| 13 | +DECLARE t_status public.task_status; |
| 14 | +DECLARE t_failure_count SMALLINT; |
| 15 | +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; |
| 16 | +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; |
| 17 | +DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; |
| 18 | +DECLARE result TIMESTAMP WITH TIME ZONE; |
| 19 | +BEGIN |
| 20 | + ASSERT timeout * 2 < task_interval; |
| 21 | + ASSERT timeout >= '1s'::interval; |
| 22 | + ASSERT task_interval >= '5s'::interval; |
| 23 | + INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at) |
| 24 | + VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout) |
| 25 | + ON CONFLICT (sync_target, sync_function) DO NOTHING |
| 26 | + RETURNING id INTO s_id; |
| 27 | + IF s_id IS NOT NULL THEN |
| 28 | + -- totally new_row, I'm on the task |
| 29 | + -- return last time it was run successfully |
| 30 | + SELECT max(last_task_start) INTO result FROM public.sync_info |
| 31 | + WHERE sync_target = s_target |
| 32 | + AND sync_function = s_function |
| 33 | + AND status = 'complete'; |
| 34 | + RETURN result; |
| 35 | + END IF; |
| 36 | + -- now we know it pre-existed. Maybe already active. |
| 37 | + SELECT id INTO STRICT s_id FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; |
| 38 | + PERFORM pg_advisory_lock(s_id); |
| 39 | + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at |
| 40 | + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at |
| 41 | + FROM public.sync_info |
| 42 | + WHERE id = s_id; |
| 43 | + |
| 44 | + IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN |
| 45 | + t_status := 'timeout'; |
| 46 | + t_failure_count := t_failure_count + 1; |
| 47 | + END IF; |
| 48 | + -- basic backoff |
| 49 | + task_interval := task_interval * (1+t_failure_count); |
| 50 | + IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN |
| 51 | + -- we are ready to take on the task |
| 52 | + SELECT max(last_task_start) INTO result FROM public.sync_info |
| 53 | + WHERE sync_target = s_target |
| 54 | + AND sync_function = s_function |
| 55 | + AND status = 'complete'; |
| 56 | + UPDATE public.sync_info |
| 57 | + SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = start_time, failure_count=t_failure_count |
| 58 | + WHERE id=s_id; |
| 59 | + ELSE |
| 60 | + -- the task has been tried recently enough |
| 61 | + IF t_status = 'timeout' THEN |
| 62 | + UPDATE public.sync_info |
| 63 | + SET status=t_status, failure_count=t_failure_count |
| 64 | + WHERE id=s_id; |
| 65 | + END IF; |
| 66 | + result := coalesce(t_last_task_end, t_last_task_start) + task_interval; |
| 67 | + END IF; |
| 68 | + |
| 69 | + PERFORM pg_advisory_unlock(s_id); |
| 70 | + RETURN result; |
| 71 | +END; |
| 72 | +$$; |
0 commit comments