25
25
26
26
{%- macro default__sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) - %}
27
27
28
+ {%- set apply_source_filter = config .get (' apply_source_filter' , false) - %}
29
+ {%- set enable_ghost_record = var(' enable_ghost_records' , false) %}
30
+
28
31
{%- set source_cols = automate_dv .expand_column_list (columns= [src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) - %}
29
32
{%- set window_cols = automate_dv .expand_column_list (columns= [src_pk, src_hashdiff, src_ldts]) - %}
30
33
{%- set pk_cols = automate_dv .expand_column_list (columns= [src_pk]) - %}
31
- {%- set enable_ghost_record = var(' enable_ghost_records' , false) %}
32
34
33
35
{%- if model .config .materialized == ' vault_insert_by_rank' %}
34
36
{%- set source_cols_with_rank = source_cols + [config .get (' rank_column' )] - %}
@@ -66,6 +68,21 @@ latest_records AS (
66
68
QUALIFY rank_num = 1
67
69
),
68
70
71
+ {%- if apply_source_filter %}
72
+
73
+ valid_stg AS (
74
+ SELECT {{ automate_dv .prefix (source_cols, ' s' , alias_target= ' source' ) }}
75
+ FROM source_data AS s
76
+ LEFT JOIN latest_records AS sat
77
+ ON {{ automate_dv .multikey (src_pk, prefix= [' s' , ' sat' ], condition= ' =' ) }}
78
+ WHERE {{ automate_dv .multikey (src_pk, prefix= ' sat' , condition= ' IS NULL' ) }}
79
+ OR {{ automate_dv .prefix ([src_ldts], ' s' ) }} > (
80
+ SELECT MAX ({{ src_ldts }}) FROM latest_records AS sat
81
+ WHERE {{ automate_dv .multikey (src_pk, prefix= [' sat' ,' s' ], condition= ' =' ) }}
82
+ )
83
+ ),
84
+ {%- endif %}
85
+
69
86
{%- endif %}
70
87
71
88
first_record_in_set AS (
@@ -75,14 +92,22 @@ first_record_in_set AS (
75
92
PARTITION BY {{ automate_dv .prefix ([src_pk], ' sd' , alias_target= ' source' ) }}
76
93
ORDER BY {{ automate_dv .prefix ([src_ldts], ' sd' , alias_target= ' source' ) }} ASC
77
94
) as asc_rank
95
+ {%- if automate_dv .is_any_incremental () and apply_source_filter %}
96
+ FROM valid_stg as sd
97
+ {%- else %}
78
98
FROM source_data as sd
99
+ {%- endif %}
79
100
QUALIFY asc_rank = 1
80
101
),
81
102
82
103
unique_source_records AS (
83
104
SELECT DISTINCT
84
105
{{ automate_dv .prefix (source_cols, ' sd' , alias_target= ' source' ) }}
106
+ {%- if automate_dv .is_any_incremental () and apply_source_filter %}
107
+ FROM valid_stg as sd
108
+ {%- else %}
85
109
FROM source_data as sd
110
+ {%- endif %}
86
111
QUALIFY {{ automate_dv .prefix ([src_hashdiff], ' sd' , alias_target= ' source' ) }} != LAG({{ automate_dv .prefix ([src_hashdiff], ' sd' , alias_target= ' source' ) }}) OVER (
87
112
PARTITION BY {{ automate_dv .prefix ([src_pk], ' sd' , alias_target= ' source' ) }}
88
113
ORDER BY {{ automate_dv .prefix ([src_ldts], ' sd' , alias_target= ' source' ) }} ASC )
@@ -93,9 +118,9 @@ unique_source_records AS (
93
118
94
119
ghost AS (
95
120
{{ automate_dv .create_ghost_record (src_pk= src_pk, src_hashdiff= src_hashdiff,
96
- src_payload= src_payload, src_extra_columns= src_extra_columns,
97
- src_eff= src_eff, src_ldts= src_ldts,
98
- src_source= src_source, source_model= source_model) }}
121
+ src_payload= src_payload, src_extra_columns= src_extra_columns,
122
+ src_eff= src_eff, src_ldts= src_ldts,
123
+ src_source= src_source, source_model= source_model) }}
99
124
),
100
125
101
126
{%- endif %}
@@ -113,7 +138,7 @@ records_to_insert AS (
113
138
SELECT {{ automate_dv .alias_all (source_cols, ' frin' ) }}
114
139
FROM first_record_in_set AS frin
115
140
{%- if automate_dv .is_any_incremental () %}
116
- LEFT JOIN LATEST_RECORDS lr
141
+ LEFT JOIN latest_records lr
117
142
ON {{ automate_dv .multikey (src_pk, prefix= [' lr' ,' frin' ], condition= ' =' ) }}
118
143
AND {{ automate_dv .prefix ([src_hashdiff], ' lr' , alias_target= ' target' ) }} = {{ automate_dv .prefix ([src_hashdiff], ' frin' ) }}
119
144
WHERE {{ automate_dv .prefix ([src_hashdiff], ' lr' , alias_target= ' target' ) }} IS NULL
0 commit comments