-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpgmb.sql
More file actions
1100 lines (996 loc) · 33 KB
/
pgmb.sql
File metadata and controls
1100 lines (996 loc) · 33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
to explain inner fns: https://stackoverflow.com/a/30547418
-- Enable auto_explain for debugging
LOAD 'auto_explain';
SET auto_explain.log_nested_statements = 'on';
SET auto_explain.log_min_duration = 0;
SET client_min_messages TO log;
*/
CREATE SCHEMA IF NOT EXISTS "pgmb";
SET search_path TO pgmb;
-- create the configuration table for pgmb ----------------
CREATE TYPE config_type AS ENUM(
'plugin_version',
-- how long to retain old partitions?
-- partitions older than this interval will be deleted
'partition_retention_period',
-- how far into the future to create partitions
'future_intervals_to_create',
'partition_interval',
'poll_chunk_size',
'use_pg_cron',
'pg_cron_poll_for_events_cron',
'pg_cron_partition_maintenance_cron'
);
CREATE TABLE IF NOT EXISTS config(
-- unique identifier for the subscription config
id config_type PRIMARY KEY,
value TEXT
);
CREATE OR REPLACE FUNCTION get_config_value(
config_id config_type
) RETURNS TEXT AS $$
SELECT value FROM config WHERE id = config_id
$$ LANGUAGE sql STRICT STABLE PARALLEL SAFE SET SEARCH_PATH TO pgmb;
INSERT INTO config(id, value) VALUES
('plugin_version', '0.2.0'),
('partition_retention_period', '60 minutes'),
('future_intervals_to_create', '3 hours'),
('partition_interval', '30 minutes'),
('poll_chunk_size', '10000'),
('pg_cron_poll_for_events_cron', '1 second'),
-- every 30 minutes
('pg_cron_partition_maintenance_cron', '0 * * * *');
-- we'll create the events table next & its functions ---------------
CREATE DOMAIN event_id AS VARCHAR(24);
-- fn to create a random bigint.
CREATE OR REPLACE FUNCTION create_random_bigint()
RETURNS BIGINT AS $$
BEGIN
-- the message ID allows for 7 hex-bytes of randomness,
-- i.e. 28 bits of randomness. Thus, the max we allow is 2^28/2
-- i.e. 0xffffff8, which allows for batch inserts to increment the
-- randomness for up to another 2^28/2 messages (more than enough)
RETURN (random() * 0xffffff8)::BIGINT;
END
$$ LANGUAGE plpgsql VOLATILE PARALLEL SAFE;
-- Creates a timestamped event ID. It is a 24-character string
-- that consists of:
-- 1. 'pm' prefix
-- 2. 13-character hex representation of the timestamp in microseconds
-- 3. remaining random
CREATE OR REPLACE FUNCTION create_event_id(ts timestamptz, rand bigint)
RETURNS event_id AS $$
SELECT substr(
-- ensure we're always 24 characters long by right-padding with '0's
'pm'
-- we'll give 13 hex characters for microsecond timestamp
|| lpad(to_hex((extract(epoch from ts) * 1000000)::bigint), 13, '0')
-- fill remaining with randomness
|| rpad(to_hex(rand), 9, '0'),
1,
24
)
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE SECURITY DEFINER
SET search_path TO pgmb;
CREATE OR REPLACE FUNCTION create_event_id_default()
RETURNS event_id AS $$
SELECT create_event_id(clock_timestamp(), create_random_bigint())
$$ LANGUAGE sql VOLATILE STRICT PARALLEL SAFE SECURITY DEFINER
SET search_path TO pgmb;
-- fn to extract the date from a message ID.
CREATE OR REPLACE FUNCTION extract_date_from_event_id(id event_id)
RETURNS TIMESTAMPTZ AS $$
SELECT to_timestamp(('0x' || substr(id, 3, 13))::numeric / 1000000)
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE SECURITY INVOKER
SET search_path TO pgmb;
CREATE DOMAIN subscription_id AS VARCHAR(24);
CREATE TABLE IF NOT EXISTS events(
id event_id PRIMARY KEY DEFAULT create_event_id_default(),
topic VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
-- if an event is directed to a specific subscription,
-- this field will be set to that subscription's ID
subscription_id subscription_id
) PARTITION BY RANGE (id);
CREATE UNLOGGED TABLE IF NOT EXISTS unread_events (
event_id event_id PRIMARY KEY
) WITH (
-- tune autovacuum for high insert & delete rates
autovacuum_vacuum_scale_factor = 0.01,
autovacuum_vacuum_threshold = 5000,
autovacuum_analyze_scale_factor = 0.005,
autovacuum_analyze_threshold = 1000,
autovacuum_vacuum_cost_delay = 0
);
-- statement level trigger to insert new events into unread_events.
-- The "poll_for_events" function will read from this table, and
-- dispatch events to subscriptions.
CREATE OR REPLACE FUNCTION mark_events_as_unread()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO unread_events(event_id)
SELECT e.id FROM NEW e;
RETURN NULL;
END
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE
SET search_path TO pgmb;
CREATE TRIGGER mark_events_as_unread_trigger
AFTER INSERT ON events
REFERENCING NEW TABLE AS NEW
FOR EACH STATEMENT
EXECUTE FUNCTION mark_events_as_unread();
CREATE OR REPLACE FUNCTION get_time_partition_name(
table_id regclass,
ts timestamptz
) RETURNS TEXT AS $$
SELECT table_id || '_' || to_char(ts, 'YYYYMMDDHH24MI')
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
-- finds the series of contiguous partitions and their bounds
CREATE OR REPLACE FUNCTION get_partitions_and_bounds(
table_id regclass
) RETURNS TABLE(
lower_bound event_id,
upper_bound event_id,
partition_ids oid[]
) AS $$
WITH partitions AS (
select
pc.oid,
REGEXP_MATCH(
pg_get_expr(pc.relpartbound, pc.oid),
'^FOR VALUES FROM \(''(.*)''\) TO \(''(.*)''\)$'
) AS bounds
from pg_inherits pts
inner join pg_class pc on pc.oid = pts.inhrelid
where pts.inhparent = table_id
),
-- from: https://dba.stackexchange.com/a/101010
ordered_intervals AS (
SELECT
*,
(LAG(bounds[2]) OVER (ORDER BY bounds[1]) < bounds[1] OR NULL) as step
FROM partitions
),
grouped_intervals AS (
select *, count(step) over (order by bounds[1]) as grp
from ordered_intervals
)
select
MIN(bounds[1]),
MAX(bounds[2]),
array_agg(oid)
FROM grouped_intervals
GROUP BY grp;
$$ LANGUAGE sql STABLE PARALLEL SAFE SECURITY INVOKER
SET search_path TO pgmb;
-- Partition maintenance function for events table. Creates partitions for
-- the current and next interval. Deletes partitions that are older than the
-- configured time interval.
-- Exact partition size and oldest partition interval can be configured
-- using the "subscriptions_config" table.
CREATE OR REPLACE FUNCTION maintain_time_partitions_using_event_id(
table_id regclass,
partition_interval INTERVAL,
future_interval INTERVAL,
retention_period INTERVAL,
additional_sql TEXT DEFAULT NULL,
current_ts timestamptz DEFAULT NOW()
)
RETURNS void AS $$
DECLARE
ts_trunc timestamptz := date_bin(partition_interval, current_ts, '2000-1-1');
oldest_pt_to_keep text := pgmb
.get_time_partition_name(table_id, ts_trunc - retention_period);
lock_key CONSTANT BIGINT :=
hashtext('pgmb.maintain_tp.' || table_id::text);
ranges_to_create tstzrange[];
partitions_to_drop regclass[];
p_to_drop regclass;
cur_range tstzrange;
max_retries constant int = 50;
BEGIN
ASSERT partition_interval >= interval '1 minute',
'partition_interval must be at least 1 minute';
ASSERT future_interval >= partition_interval,
'future_interval must be at least as large as partition_interval';
IF NOT pg_try_advisory_xact_lock(lock_key) THEN
-- another process is already maintaining partitions for this table
RETURN;
END IF;
-- find all intervals we need to create partitions for
WITH existing_part_ranges AS (
SELECT
tstzrange(
extract_date_from_event_id(lower_bound),
extract_date_from_event_id(upper_bound),
'[]'
) as range
FROM pgmb.get_partitions_and_bounds(table_id)
),
future_tzs AS (
SELECT
tstzrange(dt, dt + partition_interval, '[]') AS range
FROM generate_series(
ts_trunc,
ts_trunc + future_interval,
partition_interval
) AS gs(dt)
),
diffs AS (
SELECT
CASE WHEN epr.range IS NOT NULL
THEN (ftz.range::tstzmultirange - epr.range::tstzmultirange)
ELSE ftz.range::tstzmultirange
END AS ranges
FROM future_tzs ftz
LEFT JOIN existing_part_ranges epr ON ftz.range && epr.range
)
select ARRAY_AGG(u.range) FROM diffs
CROSS JOIN LATERAL unnest(diffs.ranges) AS u(range)
INTO ranges_to_create;
ranges_to_create := COALESCE(ranges_to_create, ARRAY[]::tstzrange[]);
SELECT ARRAY_AGG(inhrelid::regclass) INTO partitions_to_drop
FROM pg_catalog.pg_inherits
WHERE inhparent = table_id
AND inhrelid::regclass::text < oldest_pt_to_keep;
partitions_to_drop := COALESCE(partitions_to_drop, ARRAY[]::regclass[]);
-- check if nothing to do
IF
array_length(partitions_to_drop, 1) = 0
AND array_length(ranges_to_create, 1) = 0
THEN
RETURN;
END IF;
-- go from now to future_interval
FOREACH cur_range IN ARRAY ranges_to_create LOOP
DECLARE
start_ev_id event_id := pgmb.create_event_id(lower(cur_range), 0);
end_ev_id event_id := pgmb.create_event_id(upper(cur_range), 0);
pt_name TEXT := pgmb.get_time_partition_name(table_id, lower(cur_range));
BEGIN
RAISE NOTICE 'creating partition "%". start: %, end: %',
pt_name, lower(cur_range), upper(cur_range);
EXECUTE FORMAT(
'CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
pt_name, table_id, start_ev_id, end_ev_id
);
IF additional_sql IS NOT NULL THEN
EXECUTE REPLACE(additional_sql, '$1', pt_name);
END IF;
END;
END LOOP;
-- Drop old partitions
FOREACH p_to_drop IN ARRAY partitions_to_drop LOOP
EXECUTE format('DROP TABLE %I', p_to_drop);
END LOOP;
END;
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE SECURITY DEFINER;
CREATE OR REPLACE FUNCTION get_current_partition(
table_id regclass,
current_ts timestamptz DEFAULT NOW()
) RETURNS regclass AS $$
SELECT inhrelid::regclass
FROM pg_catalog.pg_inherits
WHERE inhparent = table_id
AND inhrelid::regclass::text
<= pgmb.get_time_partition_name(table_id, current_ts)
ORDER BY inhrelid DESC
LIMIT 1
$$ LANGUAGE sql STABLE PARALLEL SAFE SECURITY DEFINER;
-- subscriptions table and related functions ----------------
CREATE DOMAIN group_id AS VARCHAR(48);
CREATE OR REPLACE FUNCTION create_subscription_id()
RETURNS subscription_id AS $$
SELECT 'su' || substring(
create_event_id(NOW(), create_random_bigint())
FROM 3
);
$$ LANGUAGE sql VOLATILE STRICT PARALLEL SAFE SECURITY DEFINER
SET search_path TO pgmb;
-- subscription, groups tables and functions will go here ----------------
CREATE TABLE subscription_groups(
id group_id PRIMARY KEY,
created_at TIMESTAMPTZ DEFAULT NOW(),
last_read_event_id event_id DEFAULT create_event_id(NOW(), 0)
);
CREATE TABLE subscriptions (
-- unique identifier for the subscription
id subscription_id PRIMARY KEY DEFAULT create_subscription_id(),
-- define how the subscription is grouped. subscriptions belonging
-- to the same group are read in one batch.
group_id group_id NOT NULL REFERENCES subscription_groups(id)
ON DELETE RESTRICT,
-- A SQL expression that will be used to filter events for this subscription.
-- The events table will be aliased as "e" in this expression. The subscription
-- table is available as "s".
-- Example: "e.topic = s.metadata->>'topic'",
conditions_sql TEXT NOT NULL DEFAULT 'TRUE',
-- params will be indexed, and can be used to store
-- additional parameters for the subscription's conditions_sql.
-- It's more efficient to have the same conditions_sql for multiple
-- subscriptions, and differentiate them using params.
params JSONB NOT NULL DEFAULT '{}'::jsonb,
identity bigint GENERATED ALWAYS AS (
hashtext(
group_id
|| '/' || conditions_sql
|| '/' || jsonb_hash(params)::text
)
) STORED UNIQUE,
-- when was this subscription last active
last_active_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- how long before this subscription expires since
-- its last_active_at time. NULL means it never expires.
expiry_interval INTERVAL
);
-- immutable fn to add interval to timestamptz
CREATE FUNCTION add_interval_imm(tstz TIMESTAMPTZ, itvl INTERVAL)
RETURNS TIMESTAMPTZ AS $$
SELECT tstz + itvl;
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE
SET search_path TO pgmb;
-- note: index to quickly find expired subscriptions, not creating
-- a column separately because there's some weird deadlock issue
-- when creating a separate generated "expires_at" column.
CREATE INDEX ON subscriptions(
group_id,
add_interval_imm(last_active_at, expiry_interval)
) WHERE expiry_interval IS NOT NULL;
-- fastupdate=false, slows down subscription creation, but ensures the costlier
-- "poll_for_events" function is executed faster.
CREATE EXTENSION IF NOT EXISTS btree_gin;
CREATE INDEX "sub_gin" ON subscriptions
USING GIN(conditions_sql, params) WITH (fastupdate = false);
-- materialized view to hold distinct conditions_sql statements.
-- We utilise changes in this view to determine when to prepare the
-- "poll_for_events" function.
CREATE MATERIALIZED VIEW IF NOT EXISTS subscription_cond_sqls AS (
SELECT DISTINCT conditions_sql FROM subscriptions
ORDER BY conditions_sql
);
CREATE UNIQUE INDEX IF NOT EXISTS
subscription_cond_sqls_idx ON subscription_cond_sqls(conditions_sql);
-- subscription events holds the events dispatched to each subscription
-- for each group. Like events, this is also an insert-only table. Groups
-- move their cursors forward as they read events. We can implement very safe
-- cursor movement as only a single writer (poll_for_events) writes to this table,
-- and multiple readers read from it.
CREATE TABLE IF NOT EXISTS subscription_events(
id event_id,
group_id group_id,
event_id event_id,
subscription_id subscription_id
) PARTITION BY RANGE (id);
CREATE INDEX IF NOT EXISTS subscription_events_group_idx
ON subscription_events(group_id, id);
-- Create a role with minimal access to the database. As we deal with
-- custom SQL quite often, we don't want an accidentally malicious or bad
-- SQL to have too much access to the database.
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_roles WHERE rolname = 'pgmb_reader'
) THEN
CREATE ROLE pgmb_reader NOLOGIN NOSUPERUSER NOCREATEDB
NOCREATEROLE NOINHERIT NOREPLICATION;
GRANT pgmb_reader TO CURRENT_USER;
END IF;
END
$$;
-- Give schema usage access
GRANT USAGE, CREATE ON SCHEMA pgmb TO pgmb_reader;
GRANT SELECT ON TABLE events TO pgmb_reader;
GRANT SELECT ON TABLE config TO pgmb_reader;
GRANT SELECT ON TABLE subscriptions TO pgmb_reader;
GRANT SELECT, UPDATE, DELETE ON TABLE unread_events TO pgmb_reader;
-- Grant insert-only access to "subscription_events"
GRANT UPDATE, INSERT ON TABLE subscription_events TO pgmb_reader;
SET ROLE pgmb_reader;
-- This trigger puts the conditions_sql through a syntax check
CREATE OR REPLACE FUNCTION validate_subscription_conditions_sql()
RETURNS TRIGGER AS $$
BEGIN
EXECUTE 'SELECT * FROM jsonb_populate_recordset(NULL::pgmb.events, ''[]'') e
INNER JOIN jsonb_populate_recordset(NULL::pgmb.subscriptions, ''[{}]'') s
ON ' || NEW.conditions_sql;
RETURN NEW;
END;
$$ LANGUAGE plpgsql STABLE PARALLEL SAFE
SET search_path TO pgmb
SECURITY DEFINER;
RESET ROLE;
CREATE TRIGGER validate_subscription_conditions_sql_trigger
BEFORE INSERT OR UPDATE ON subscriptions
FOR EACH ROW
EXECUTE FUNCTION validate_subscription_conditions_sql();
-- poll_for_events function template. As we add/remove different subscriptions,
-- we'll prepare a new version of the "poll_for_events" function with
-- the updated "conditions_sql" statements.
-- A template function is used so we get syntax highlighting and checking
-- when editing this function.
CREATE OR REPLACE FUNCTION poll_for_events_tmpl()
RETURNS INT AS $body$
DECLARE
read_ids event_id[];
max_id event_id;
min_id event_id;
chunk_size INT := get_config_value('poll_chunk_size')::INT;
inserted_rows integer;
start_num BIGINT := create_random_bigint();
write_start TIMESTAMPTZ;
lock_key CONSTANT BIGINT :=
hashtext('pgmb.poll_for_events');
BEGIN
IF NOT pg_try_advisory_xact_lock(lock_key) THEN
-- another process is already polling for events
RETURN 0;
END IF;
WITH to_delete AS (
SELECT td.event_id
FROM unread_events td
WHERE td.event_id < create_event_id(NOW(), 0)
FOR UPDATE SKIP LOCKED
LIMIT chunk_size
),
deleted AS (
DELETE FROM unread_events re
USING to_delete td
WHERE re.event_id = td.event_id
)
SELECT
MAX(event_id),
MIN(event_id),
ARRAY_AGG(event_id)
INTO max_id, min_id, read_ids
FROM to_delete;
IF max_id IS NULL THEN
RETURN 0;
END IF;
write_start := clock_timestamp();
WITH read_events AS (
SELECT e.*
FROM events e
INNER JOIN unnest(read_ids) r(id) ON e.id = r.id
WHERE e.id <= max_id AND e.id >= min_id
)
INSERT INTO subscription_events(id, group_id, subscription_id, event_id)
SELECT
create_event_id(write_start, start_num + row_number() OVER ()),
s.group_id,
s.id,
e.id
FROM read_events e
INNER JOIN subscriptions s ON
s.id = e.subscription_id
OR (
e.subscription_id IS NULL
AND (
-- Do not edit this line directly. Will be replaced
-- in the prepared function.
TRUE -- CONDITIONS_SQL_PLACEHOLDER --
)
)
ON CONFLICT DO NOTHING;
GET DIAGNOSTICS inserted_rows = ROW_COUNT;
-- return total inserted events
RETURN inserted_rows;
END;
$body$ LANGUAGE plpgsql VOLATILE STRICT PARALLEL UNSAFE
SET search_path TO pgmb
SECURITY DEFINER;
CREATE OR REPLACE FUNCTION prepare_poll_for_events_fn(
sql_statements TEXT[]
) RETURNS VOID AS $$
DECLARE
tmpl_proc_name constant TEXT :=
'poll_for_events_tmpl';
tmpl_proc_placeholder constant TEXT :=
'TRUE -- CONDITIONS_SQL_PLACEHOLDER --';
condition_sql TEXT;
proc_src TEXT;
BEGIN
IF sql_statements = '{}' THEN
-- no subscriptions, so just use 'FALSE' to avoid any matches
sql_statements := ARRAY['FALSE'];
END IF;
-- build the condition SQL
condition_sql := FORMAT(
'('
|| array_to_string(
ARRAY(
SELECT
'(' || stmt || ') AND s.conditions_sql = %L'
FROM unnest(sql_statements) AS arr(stmt)
),
') OR ('
)
|| ')',
VARIADIC sql_statements
);
condition_sql := FORMAT('/* updated at %s */', NOW()) || condition_sql;
-- fetch the source of the template procedure
select pg_get_functiondef(oid) INTO proc_src
from pg_proc where proname = tmpl_proc_name and
pronamespace = 'pgmb'::regnamespace;
IF proc_src IS NULL THEN
RAISE EXCEPTION 'Template procedure % not found', tmpl_proc_name;
END IF;
-- replace the placeholder with the actual condition SQL
proc_src := REPLACE(proc_src, tmpl_proc_placeholder, condition_sql);
proc_src := REPLACE(proc_src, tmpl_proc_name, 'poll_for_events');
-- the new poll_for_events function will be created with
-- the pgmb_reader role, to avoid a bad "conditions_sql"
-- from having any destructive access to the database.
EXECUTE proc_src;
-- changing the owner will ensure that the function is executed with
-- the pgmb_reader's permissions.
-- https://www.postgresql.org/docs/current/sql-alterfunction.html
EXECUTE 'ALTER FUNCTION poll_for_events() OWNER TO pgmb_reader';
END;
$$ LANGUAGE plpgsql VOLATILE STRICT PARALLEL UNSAFE
SET search_path TO pgmb
SECURITY INVOKER;
SELECT prepare_poll_for_events_fn('{}'::text[]);
-- we'll prepare the subscription read statement whenever subscriptions
-- are created/updated/deleted
CREATE OR REPLACE FUNCTION refresh_subscription_read_statements()
RETURNS TRIGGER AS $$
DECLARE
needs_refresh BOOLEAN := FALSE;
old_conditions_sql TEXT[];
conditions_sql TEXT[];
BEGIN
old_conditions_sql := ARRAY(
SELECT * FROM subscription_cond_sqls
ORDER BY conditions_sql
);
REFRESH MATERIALIZED VIEW CONCURRENTLY subscription_cond_sqls;
conditions_sql := ARRAY(
SELECT * FROM subscription_cond_sqls
ORDER BY conditions_sql
);
-- conditions_sql hasn't changed, no need to refresh the
-- poll_for_events function
IF conditions_sql = old_conditions_sql THEN
RETURN NULL;
END IF;
PERFORM prepare_poll_for_events_fn(conditions_sql);
RETURN NULL;
END
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE
SET search_path TO pgmb
SECURITY INVOKER;
CREATE TRIGGER refresh_subscription_read_statements_trigger
AFTER INSERT OR UPDATE OR DELETE ON subscriptions
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_subscription_read_statements();
-- Utility fn to read events by their IDs. This exists as postgres
-- doesn't correctly filter which partitions to read from when using
-- an IN/JOIN clause on a partitioned table.
CREATE OR REPLACE FUNCTION read_events(
event_ids event_id[]
) RETURNS SETOF events AS $$
DECLARE
max_id event_id;
min_id event_id;
BEGIN
IF array_length(event_ids, 1) = 0 THEN
RETURN;
END IF;
-- get min and max ids, allows PG to correctly prune partitions
SELECT
MAX(eid),
MIN(eid)
INTO max_id, min_id
FROM unnest(event_ids) AS u(eid);
RETURN QUERY
SELECT e.*
FROM events e
INNER JOIN unnest(event_ids) AS u(eid) ON e.id = u.eid
WHERE e.id <= max_id AND e.id >= min_id
ORDER BY u.eid;
END;
$$ LANGUAGE plpgsql STRICT STABLE PARALLEL SAFE
SET search_path TO pgmb;
-- fn to read next events for a subscription group
CREATE OR REPLACE FUNCTION read_next_events(
gid VARCHAR(48),
cursor event_id DEFAULT NULL,
chunk_size INT DEFAULT get_config_value('poll_chunk_size')::INT,
-- if peek is true, we do not require having to acquire the advisory
-- lock to read events. Useful to debug without blocking other readers.
peek BOOLEAN DEFAULT FALSE
) RETURNS TABLE(
id event_id,
topic VARCHAR(255),
payload JSONB,
metadata JSONB,
subscription_ids subscription_id[],
next_cursor event_id
) AS $$
DECLARE
lock_key CONSTANT BIGINT :=
hashtext('pgmb.read_next_events.' || gid);
BEGIN
-- provide a lock for the group, so that if we temporarily
-- or accidentally have multiple readers for the same group,
-- they don't interfere with each other.
IF NOT pg_try_advisory_lock(lock_key) AND NOT peek THEN
RETURN;
END IF;
-- fetch the cursor to read from
-- if no cursor is provided, fetch from the group's last read event id
IF cursor IS NULL THEN
SELECT sc.last_read_event_id
FROM subscription_groups sc
WHERE sc.id = gid
INTO cursor;
END IF;
-- if still null, don't return anything
IF cursor IS NULL THEN
RETURN;
END IF;
RETURN QUERY WITH next_events AS (
SELECT
se.id,
se.event_id,
se.subscription_id
FROM subscription_events se
INNER JOIN subscriptions s ON s.id = se.subscription_id
WHERE se.group_id = gid
AND se.id < create_event_id(NOW(), 0)
AND se.id > cursor
LIMIT chunk_size
),
next_events_grp AS (
SELECT
ne.event_id,
ARRAY_AGG(ne.subscription_id) AS subscription_ids
FROM next_events ne
GROUP BY ne.event_id
ORDER BY ne.event_id
)
SELECT
e.id,
e.topic,
e.payload,
e.metadata,
ne.subscription_ids,
(SELECT MAX(ne2.id)::event_id FROM next_events ne2)
FROM read_events(ARRAY(SELECT ne.event_id FROM next_events_grp ne)) e
INNER JOIN next_events_grp ne ON ne.event_id = e.id;
END
$$ LANGUAGE plpgsql STABLE PARALLEL SAFE
SET search_path TO pgmb
SECURITY INVOKER;
CREATE OR REPLACE FUNCTION replay_events(
gid VARCHAR(48),
sid VARCHAR(24),
from_event_id event_id,
max_events INT
) RETURNS SETOF events AS $$
DECLARE
event_ids event_id[];
now_id event_id := create_event_id(NOW(), 0);
BEGIN
SELECT ARRAY_AGG(se.event_id) INTO event_ids
FROM subscription_events se
WHERE se.group_id = gid
AND se.subscription_id = sid
AND se.event_id > from_event_id
AND se.event_id <= now_id
-- we filter "id" by the same range too, because
-- 1. the format of se.id and e.id are the same. And rows are
-- inserted into the se table after the corresponding e row is created,
-- so if we find rows > from_event_id in se.event_id, the corresponding
-- e.id will also be > from_event_id
-- 2. it helps postgres prune which partitions to read from
AND se.id <= now_id
AND se.id > from_event_id
LIMIT (max_events + 1);
IF array_length(event_ids, 1) > max_events THEN
RAISE EXCEPTION
'Too many events to replay. Please replay in smaller batches.';
END IF;
RETURN QUERY SELECT * FROM read_events(event_ids);
END $$ LANGUAGE plpgsql STABLE PARALLEL SAFE
SET search_path TO pgmb
SECURITY INVOKER;
CREATE OR REPLACE FUNCTION release_group_lock(gid VARCHAR(48))
RETURNS VOID AS $$
DECLARE
lock_key CONSTANT BIGINT :=
hashtext('pgmb.read_next_events.' || gid);
BEGIN
PERFORM pg_advisory_unlock(lock_key);
END
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE
SET search_path TO pgmb;
-- upsert the group's cursor
CREATE OR REPLACE FUNCTION set_group_cursor(
gid VARCHAR(48),
new_cursor event_id,
-- if true, release any existing lock for this group
release_lock BOOLEAN
) RETURNS VOID AS $$
BEGIN
-- upsert the new cursor
INSERT INTO subscription_groups(id, last_read_event_id)
VALUES (gid, new_cursor)
ON CONFLICT (id) DO UPDATE
SET last_read_event_id = EXCLUDED.last_read_event_id;
-- release any existing lock for this group, if we hold one
IF release_lock THEN
PERFORM release_group_lock(gid);
END IF;
END
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE
SET search_path TO pgmb;
-- contains fn to maintain partitions for an append-only table, this
-- can be used for both "events" and "subscription_events" tables.
-- It trims old partitions that are outside the retention period, and creates new
-- ones. Also ensures partitions aren't autovacuumed.
CREATE FUNCTION maintain_append_only_table(
tbl regclass,
current_ts timestamptz DEFAULT NOW()
)
RETURNS VOID AS $$
SELECT maintain_time_partitions_using_event_id(
tbl,
partition_interval := get_config_value('partition_interval')::interval,
future_interval := get_config_value('future_intervals_to_create')::interval,
retention_period := get_config_value('partition_retention_period')::interval,
-- turn off autovacuum on the events table, since we're not
-- going to be updating/deleting rows from it.
-- Also set fillfactor to 100 since we're only inserting.
additional_sql := 'ALTER TABLE $1 SET(
fillfactor = 100,
autovacuum_enabled = false,
toast.autovacuum_enabled = false
);',
current_ts := current_ts
);
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE SECURITY DEFINER
SET search_path TO pgmb;
CREATE OR REPLACE PROCEDURE maintain_events_table(
current_ts timestamptz DEFAULT NOW()
) AS $$
BEGIN
SET search_path TO pgmb;
-- we commit after each maintainance function to release locks on the
-- partitions as soon as possible. This avoids blocking "poll_for_events",
-- & "read_next_events" functions, which when executing all concurrently,
-- may cause deadlocks due to lock contention on the partitions.
PERFORM maintain_append_only_table('events'::regclass, current_ts);
COMMIT;
PERFORM maintain_append_only_table('subscription_events'::regclass, current_ts);
COMMIT;
END;
$$ LANGUAGE plpgsql;
SELECT maintain_append_only_table('events'::regclass);
SELECT maintain_append_only_table('subscription_events'::regclass);
-- setup pg_cron if it's available ----------------
CREATE OR REPLACE FUNCTION manage_cron_jobs_trigger_fn()
RETURNS TRIGGER AS $$
DECLARE
poll_job_name CONSTANT TEXT := 'pgmb_poll';
maintain_job_name CONSTANT TEXT := 'pgmb_maintain_table_partitions';
BEGIN
IF get_config_value('use_pg_cron') = 'true' THEN
-- Schedule/update event polling job
PERFORM cron.schedule(
poll_job_name,
get_config_value('pg_cron_poll_for_events_cron'),
$CMD$
-- ensure we don't accidentally run for too long
SET SESSION statement_timeout = '10s';
SELECT pgmb.poll_for_events();
$CMD$
);
RAISE LOG 'Scheduled pgmb polling job: %', poll_job_name;
-- Schedule/update partition maintenance job
PERFORM cron.schedule(
'pgmb_maintain_table_partitions',
get_config_value('pg_cron_partition_maintenance_cron'),
$CMD$ CALL pgmb.maintain_events_table(); $CMD$
);
RAISE LOG 'Scheduled pgmb partition maintenance job: %',
maintain_job_name;
ELSIF (SELECT 1 FROM pg_namespace WHERE nspname = 'cron') THEN
RAISE LOG 'Unscheduling pgmb cron jobs.';
-- Unschedule jobs. cron.unschedule does not fail if job does not exist.
PERFORM cron.unschedule(poll_job_name);
PERFORM cron.unschedule(maintain_job_name);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE SECURITY DEFINER
SET search_path TO pgmb;
CREATE TRIGGER manage_cron_jobs_trigger
AFTER INSERT OR UPDATE OF value ON config
FOR EACH ROW
WHEN (
NEW.id IN (
'use_pg_cron',
'pg_cron_poll_for_events_cron',
'pg_cron_partition_maintenance_cron'
)
)
EXECUTE FUNCTION manage_cron_jobs_trigger_fn();
DO $$
BEGIN
IF (
SELECT true
FROM pg_available_extensions
WHERE name = 'pg_cron'
) AND (
-- ensure the current database is where pg_cron can be installed
current_database()
= coalesce(current_setting('cron.database_name', true), 'postgres')
) THEN
CREATE EXTENSION IF NOT EXISTS pg_cron;
INSERT INTO config(id, value) VALUES ('use_pg_cron', 'true');
ELSE
RAISE LOG 'pg_cron extension not available. Skipping pg_cron setup.';
INSERT INTO config(id, value) VALUES ('use_pg_cron', 'false');
END IF;
END
$$;
-- triggers to add events for specific tables ---------------------------
-- Function to create a topic string for subscriptions.
-- Eg. "public" "contacts" "INSERT" -> "public.contacts.insert"
CREATE OR REPLACE FUNCTION create_topic(
schema_name name,
table_name name,
kind varchar(16)
) RETURNS varchar(255) AS $$
SELECT lower(schema_name || '.' || table_name || '.' || kind)
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
-- Creates a function to compute the difference between two JSONB objects
-- Treats 'null' values, and non-existent keys as equal
-- Eg. jsonb_diff('{"a": 1, "b": 2, "c": null}', '{"a": 1, "b": null}') = '{"b": 2}'
CREATE OR REPLACE FUNCTION jsonb_diff(a jsonb, b jsonb)
RETURNS jsonb AS $$
SELECT jsonb_object_agg(key, value) FROM (
SELECT key, value FROM jsonb_each(a) WHERE value != 'null'::jsonb
EXCEPT
SELECT key, value FROM jsonb_each(b) WHERE value != 'null'::jsonb
)
$$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE;
-- Function to serialise a record for an event, and tell us
-- whether to emit the event or not.
-- Note: Regardless of whether to emit the event, the serialised
-- JSONB is returned.
-- By default, we always emit the event.
CREATE OR REPLACE FUNCTION serialise_record_for_event(
tabl oid,
op TEXT,
record RECORD,
serialised OUT JSONB,
emit OUT BOOLEAN
) AS $$
BEGIN
serialised := to_jsonb(record);
emit := TRUE;
RETURN;
END
$$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE SECURITY INVOKER;
-- Trigger that pushes changes to the events table
CREATE OR REPLACE FUNCTION push_table_event()
RETURNS TRIGGER AS $$
DECLARE
start_num BIGINT = create_random_bigint();
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO events(id, topic, payload)
SELECT
create_event_id(clock_timestamp(), rand := start_num + row_number() OVER ()),
create_topic(TG_TABLE_SCHEMA, TG_TABLE_NAME, TG_OP),
jsonb_strip_nulls(s.data)
FROM NEW n
CROSS JOIN LATERAL
serialise_record_for_event(TG_RELID, TG_OP, n) AS s(data, emit)
WHERE s.emit;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO events(id, topic, payload)
SELECT
create_event_id(clock_timestamp(), rand := start_num + row_number() OVER ()),
create_topic(TG_TABLE_SCHEMA, TG_TABLE_NAME, TG_OP),
jsonb_strip_nulls(to_jsonb(s.data))