|
| 1 | +defmodule Realtime.Tenants.Migrations.UseFullyQualifiedNames do |
| 2 | + @moduledoc false |
| 3 | + |
| 4 | + use Ecto.Migration |
| 5 | + |
| 6 | + def change do |
| 7 | + execute(" |
| 8 | +-- build_prepared_statement_sql function |
| 9 | +
|
| 10 | +CREATE OR REPLACE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) |
| 11 | +RETURNS text |
| 12 | +LANGUAGE sql |
| 13 | +SET search_path = '' |
| 14 | +AS $function$ |
| 15 | + /* |
| 16 | + Builds a sql string that, if executed, creates a prepared statement to |
| 17 | + tests retrive a row from *entity* by its primary key columns. |
| 18 | + Example |
| 19 | + select realtime.build_prepared_statement_sql('public.notes', '{"id"}'::text[], '{"bigint"}'::text[]) |
| 20 | + */ |
| 21 | + select |
| 22 | + 'prepare ' || prepared_statement_name || ' as |
| 23 | + select |
| 24 | + exists( |
| 25 | + select |
| 26 | + 1 |
| 27 | + from |
| 28 | + ' || entity || ' |
| 29 | + where |
| 30 | + ' || pg_catalog.string_agg(pg_catalog.quote_ident(pkc.name) || '=' || pg_catalog.quote_nullable(pkc.value #>> '{}') , ' and ') || ' |
| 31 | + )' |
| 32 | + from |
| 33 | + pg_catalog.unnest(columns) pkc |
| 34 | + where |
| 35 | + pkc.is_pkey |
| 36 | + group by |
| 37 | + entity; |
| 38 | +$function$; |
| 39 | +
|
| 40 | +-- cast function |
| 41 | +
|
| 42 | +CREATE OR REPLACE FUNCTION realtime.\"cast\"(val text, type_ regtype) |
| 43 | + RETURNS jsonb |
| 44 | + LANGUAGE plpgsql |
| 45 | + SET search_path = '' |
| 46 | + IMMUTABLE |
| 47 | +AS $function$ |
| 48 | + declare |
| 49 | + res jsonb; |
| 50 | + begin |
| 51 | + execute pg_catalog.format('select to_jsonb(%L::'|| type_::text || ')', val) into res; |
| 52 | + return res; |
| 53 | + end |
| 54 | +$function$; |
| 55 | +
|
| 56 | +-- channel_name function |
| 57 | +
|
| 58 | +CREATE OR REPLACE FUNCTION realtime.channel_name() |
| 59 | + RETURNS text |
| 60 | + SET search_path = '' |
| 61 | + LANGUAGE sql |
| 62 | + STABLE |
| 63 | +AS $function$ |
| 64 | + select nullif(pg_catalog.current_setting('realtime.channel_name', true), '')::text; |
| 65 | +$function$; |
| 66 | +
|
| 67 | +-- check_equality_op function |
| 68 | +
|
| 69 | +CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) |
| 70 | + RETURNS boolean |
| 71 | + LANGUAGE plpgsql |
| 72 | + SET search_path = '' |
| 73 | + IMMUTABLE |
| 74 | +AS $function$ |
| 75 | + /* |
| 76 | + Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness |
| 77 | + */ |
| 78 | + declare |
| 79 | + op_symbol text = ( |
| 80 | + case |
| 81 | + when op = 'eq' then '=' |
| 82 | + when op = 'neq' then '!=' |
| 83 | + when op = 'lt' then '<' |
| 84 | + when op = 'lte' then '<=' |
| 85 | + when op = 'gt' then '>' |
| 86 | + when op = 'gte' then '>=' |
| 87 | + when op = 'in' then '= any' |
| 88 | + else 'UNKNOWN OP' |
| 89 | + end |
| 90 | + ); |
| 91 | + res boolean; |
| 92 | + begin |
| 93 | + execute pg_catalog.format( |
| 94 | + 'select %L::'|| type_::text || ' ' || op_symbol |
| 95 | + || ' ( %L::' |
| 96 | + || ( |
| 97 | + case |
| 98 | + when op = 'in' then type_::text || '[]' |
| 99 | + else type_::text end |
| 100 | + ) |
| 101 | + || ')', val_1, val_2) into res; |
| 102 | + return res; |
| 103 | + end; |
| 104 | + $function$; |
| 105 | +
|
| 106 | +-- is_visible_through_filters function |
| 107 | +
|
| 108 | +CREATE OR REPLACE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) |
| 109 | +RETURNS boolean |
| 110 | +LANGUAGE sql |
| 111 | +SET search_path = '' |
| 112 | +IMMUTABLE |
| 113 | +AS $function$ |
| 114 | + /* |
| 115 | + Should the record be visible (true) or filtered out (false) after *filters* are applied |
| 116 | + */ |
| 117 | + select |
| 118 | + -- Default to allowed when no filters present |
| 119 | + $2 is null -- no filters. this should not happen because subscriptions has a default |
| 120 | + or pg_catalog.array_length($2, 1) is null -- array length of an empty array is null |
| 121 | + or bool_and( |
| 122 | + coalesce( |
| 123 | + realtime.check_equality_op( |
| 124 | + op:=f.op, |
| 125 | + type_:=coalesce( |
| 126 | + col.type_oid::regtype, -- null when wal2json version <= 2.4 |
| 127 | + col.type_name::regtype |
| 128 | + ), |
| 129 | + -- cast jsonb to text |
| 130 | + val_1:=col.value #>> '{}', |
| 131 | + val_2:=f.value |
| 132 | + ), |
| 133 | + false -- if null, filter does not match |
| 134 | + ) |
| 135 | + ) |
| 136 | + from |
| 137 | + pg_catalog.unnest(filters) f |
| 138 | + join pg_catalog.unnest(columns) col |
| 139 | + on f.column_name = col.name; |
| 140 | +$function$; |
| 141 | +
|
| 142 | +-- quote_wal2json function |
| 143 | +CREATE OR REPLACE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) |
| 144 | + RETURNS SETOF realtime.wal_rls |
| 145 | + LANGUAGE sql |
| 146 | + SET search_path = '' |
| 147 | + SET log_min_messages TO 'fatal' |
| 148 | +AS $function$ |
| 149 | + with pub as ( |
| 150 | + select |
| 151 | + concat_ws( |
| 152 | + ',', |
| 153 | + case when bool_or(pubinsert) then 'insert' else null end, |
| 154 | + case when bool_or(pubupdate) then 'update' else null end, |
| 155 | + case when bool_or(pubdelete) then 'delete' else null end |
| 156 | + ) as w2j_actions, |
| 157 | + coalesce( |
| 158 | + string_agg( |
| 159 | + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), |
| 160 | + ',' |
| 161 | + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), |
| 162 | + '' |
| 163 | + ) w2j_add_tables |
| 164 | + from |
| 165 | + pg_publication pp |
| 166 | + left join pg_publication_tables ppt |
| 167 | + on pp.pubname = ppt.pubname |
| 168 | + where |
| 169 | + pp.pubname = publication |
| 170 | + group by |
| 171 | + pp.pubname |
| 172 | + limit 1 |
| 173 | + ), |
| 174 | + w2j as ( |
| 175 | + select |
| 176 | + x.*, pub.w2j_add_tables |
| 177 | + from |
| 178 | + pub, |
| 179 | + pg_catalog.pg_logical_slot_get_changes( |
| 180 | + slot_name, null, max_changes, |
| 181 | + 'include-pk', 'true', |
| 182 | + 'include-transaction', 'false', |
| 183 | + 'include-timestamp', 'true', |
| 184 | + 'include-type-oids', 'true', |
| 185 | + 'format-version', '2', |
| 186 | + 'actions', pub.w2j_actions, |
| 187 | + 'add-tables', pub.w2j_add_tables |
| 188 | + ) x |
| 189 | + ) |
| 190 | + select |
| 191 | + xyz.wal, |
| 192 | + xyz.is_rls_enabled, |
| 193 | + xyz.subscription_ids, |
| 194 | + xyz.errors |
| 195 | + from |
| 196 | + w2j, |
| 197 | + realtime.apply_rls( |
| 198 | + wal := w2j.data::jsonb, |
| 199 | + max_record_bytes := max_record_bytes |
| 200 | + ) xyz(wal, is_rls_enabled, subscription_ids, errors) |
| 201 | + where |
| 202 | + w2j.w2j_add_tables <> '' |
| 203 | + and xyz.subscription_ids[1] is not null |
| 204 | + $function$; |
| 205 | +
|
| 206 | +-- quote_wal2json function |
| 207 | +
|
| 208 | +CREATE OR REPLACE FUNCTION realtime.quote_wal2json(entity regclass) |
| 209 | +RETURNS text |
| 210 | +LANGUAGE sql |
| 211 | +SET search_path = '' |
| 212 | +IMMUTABLE STRICT |
| 213 | +AS $function$ |
| 214 | + select |
| 215 | + ( |
| 216 | + select pg_catalog.string_agg('' || ch,'') |
| 217 | + from pg_catalog.unnest(pg_catalog.string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx) |
| 218 | + where |
| 219 | + not (x.idx = 1 and x.ch = '"') |
| 220 | + and not ( |
| 221 | + x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1) |
| 222 | + and x.ch = '"' |
| 223 | + ) |
| 224 | + ) |
| 225 | + || '.' |
| 226 | + || ( |
| 227 | + select string_agg('' || ch,'') |
| 228 | + from pg_catalog.unnest(pg_catalog.string_to_array(pc.relname::text, null)) with ordinality x(ch, idx) |
| 229 | + where |
| 230 | + not (x.idx = 1 and x.ch = '"') |
| 231 | + and not ( |
| 232 | + x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1) |
| 233 | + and x.ch = '"' |
| 234 | + ) |
| 235 | + ) |
| 236 | + from |
| 237 | + pg_class pc |
| 238 | + join pg_namespace nsp |
| 239 | + on pc.relnamespace = nsp.oid |
| 240 | + where |
| 241 | + pc.oid = entity |
| 242 | + $function$; |
| 243 | + |
| 244 | +-- subscription_check_filters function |
| 245 | + |
| 246 | +CREATE OR REPLACE FUNCTION realtime.subscription_check_filters() |
| 247 | +RETURNS trigger |
| 248 | +SET search_path = '' |
| 249 | +LANGUAGE plpgsql |
| 250 | +AS $function$ |
| 251 | + /* |
| 252 | + Validates that the user defined filters for a subscription: |
| 253 | + - refer to valid columns that the claimed role may access |
| 254 | + - values are coercable to the correct column type |
| 255 | + */ |
| 256 | + declare |
| 257 | + col_names text[] = coalesce( |
| 258 | + pg_catalog.array_agg(c.column_name order by c.ordinal_position), |
| 259 | + '{}'::text[] |
| 260 | + ) |
| 261 | + from |
| 262 | + information_schema.columns c |
| 263 | + where |
| 264 | + pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity |
| 265 | + and pg_catalog.has_column_privilege( |
| 266 | + (new.claims ->> 'role'), |
| 267 | + pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass, |
| 268 | + c.column_name, |
| 269 | + 'SELECT' |
| 270 | + ); |
| 271 | + filter realtime.user_defined_filter; |
| 272 | + col_type regtype; |
| 273 | + |
| 274 | + in_val jsonb; |
| 275 | + begin |
| 276 | + for filter in select * from pg_catalog.unnest(new.filters) loop |
| 277 | + -- Filtered column is valid |
| 278 | + if not filter.column_name = any(col_names) then |
| 279 | + raise exception 'invalid column for filter %', filter.column_name; |
| 280 | + end if; |
| 281 | + |
| 282 | + -- Type is sanitized and safe for string interpolation |
| 283 | + col_type = ( |
| 284 | + select atttypid::regtype |
| 285 | + from pg_catalog.pg_attribute |
| 286 | + where attrelid = new.entity |
| 287 | + and attname = filter.column_name |
| 288 | + ); |
| 289 | + if col_type is null then |
| 290 | + raise exception 'failed to lookup type for column %', filter.column_name; |
| 291 | + end if; |
| 292 | + |
| 293 | + -- Set maximum number of entries for in filter |
| 294 | + if filter.op = 'in'::realtime.equality_op then |
| 295 | + in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); |
| 296 | + if coalesce(pg_catalog.jsonb_array_length(in_val), 0) > 100 then |
| 297 | + raise exception 'too many values for `in` filter. Maximum 100'; |
| 298 | + end if; |
| 299 | + else |
| 300 | + -- raises an exception if value is not coercable to type |
| 301 | + perform realtime.cast(filter.value, col_type); |
| 302 | + end if; |
| 303 | + |
| 304 | + end loop; |
| 305 | + |
| 306 | + -- Apply consistent order to filters so the unique constraint on |
| 307 | + -- (subscription_id, entity, filters) can't be tricked by a different filter order |
| 308 | + new.filters = coalesce( |
| 309 | + pg_catalog.array_agg(f order by f.column_name, f.op, f.value), |
| 310 | + '{}' |
| 311 | + ) from pg_catalog.unnest(new.filters) f; |
| 312 | + |
| 313 | + return new; |
| 314 | + end; |
| 315 | + $function$; |
| 316 | + |
| 317 | +CREATE OR REPLACE FUNCTION realtime.to_regrole(role_name text) |
| 318 | +RETURNS regrole |
| 319 | +SET search_path = '' |
| 320 | +LANGUAGE sql |
| 321 | +IMMUTABLE |
| 322 | +AS $function$ select role_name::regrole $function$; |
| 323 | + ") |
| 324 | + end |
| 325 | + end |
0 commit comments