|
53 | 53 | {% set build_sql = create_table_as(False, target_relation, filtered_sql) %}
|
54 | 54 | {% else %}
|
55 | 55 |
|
56 |
| - {% set target_columns = adapter.get_columns_in_relation(target_relation) %} |
57 |
| - {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} |
58 |
| - {%- set loop_vars = {'sum_rows_inserted': 0} -%} |
59 |
| - |
60 |
| - {% for i in range(min_max_ranks.max_rank | int ) -%} |
61 |
| - |
62 |
| - {%- set iteration_number = i + 1 -%} |
63 |
| - |
64 |
| - {%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%} |
65 |
| - |
66 |
| - {{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }} |
67 |
| - |
68 |
| - {% set tmp_relation = make_temp_relation(target_relation) %} |
69 |
| - |
70 |
| - {# This call statement drops and then creates a temporary table #} |
71 |
| - {# but MSSQL will fail to drop any temporary table created by a previous loop iteration #} |
72 |
| - {# See MSSQL note and drop code below #} |
73 |
| - {% call statement() -%} |
74 |
| - {{ create_table_as(True, tmp_relation, filtered_sql) }} |
| 56 | + {% if min_max_ranks.max_rank | int > 100000 %} |
| 57 | + {%- set error_message -%} |
| 58 | + 'Max iterations is 100,000. Consider using a different rank column |
| 59 | + or loading a smaller amount of data. |
| 60 | + vault_insert_by materialisations are not intended for this purpose, |
| 61 | + please see https://dbtvault.readthedocs.io/en/latest/materialisations/' |
| 62 | + {%- endset -%} |
| 63 | + |
| 64 | + {{- exceptions.raise_compiler_error(error_message) -}} |
| 65 | + {% else %} |
| 66 | + {% set target_columns = adapter.get_columns_in_relation(target_relation) %} |
| 67 | + {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} |
| 68 | + {%- set loop_vars = {'sum_rows_inserted': 0} -%} |
| 69 | + |
| 70 | + {% for i in range(min_max_ranks.max_rank | int ) -%} |
| 71 | + |
| 72 | + {%- set iteration_number = i + 1 -%} |
| 73 | + |
| 74 | + {%- set filtered_sql = dbtvault.replace_placeholder_with_rank_filter(sql, rank_column, iteration_number) -%} |
| 75 | + |
| 76 | + {{ dbt_utils.log_info("Running for {} {} of {} on column '{}' [{}]".format('rank', iteration_number, min_max_ranks.max_rank, rank_column, model.unique_id)) }} |
| 77 | + |
| 78 | + {% set tmp_relation = make_temp_relation(target_relation) %} |
| 79 | + |
| 80 | + {# This call statement drops and then creates a temporary table #} |
| 81 | + {# but MSSQL will fail to drop any temporary table created by a previous loop iteration #} |
| 82 | + {# See MSSQL note and drop code below #} |
| 83 | + {% call statement() -%} |
| 84 | + {{ create_table_as(True, tmp_relation, filtered_sql) }} |
| 85 | + {%- endcall %} |
| 86 | + |
| 87 | + {{ adapter.expand_target_column_types(from_relation=tmp_relation, |
| 88 | + to_relation=target_relation) }} |
| 89 | + |
| 90 | + {%- set insert_query_name = 'main-' ~ i -%} |
| 91 | + {% call statement(insert_query_name, fetch_result=True) -%} |
| 92 | + INSERT INTO {{ target_relation }} ({{ target_cols_csv }}) |
| 93 | + ( |
| 94 | + SELECT {{ target_cols_csv }} |
| 95 | + FROM {{ tmp_relation.include(schema=True) }} |
| 96 | + ); |
| 97 | + {%- endcall %} |
| 98 | + |
| 99 | + {% set result = load_result(insert_query_name) %} |
| 100 | + {% if 'response' in result.keys() %} {# added in v0.19.0 #} |
| 101 | + {# Investigate for Databricks #} |
| 102 | + {%- if result['response']['rows_affected'] == None %} |
| 103 | + {% set rows_inserted = 0 %} |
| 104 | + {%- else %} |
| 105 | + {% set rows_inserted = result['response']['rows_affected'] %} |
| 106 | + {%- endif %} |
| 107 | + |
| 108 | + {% else %} {# older versions #} |
| 109 | + {% set rows_inserted = result['status'].split(" ")[2] | int %} |
| 110 | + {% endif %} |
| 111 | + |
| 112 | + {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} |
| 113 | + {%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} |
| 114 | + |
| 115 | + {{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number, |
| 116 | + min_max_ranks.max_rank, |
| 117 | + rows_inserted, |
| 118 | + model.unique_id)) }} |
| 119 | + |
| 120 | + {# In databricks and sqlserver a temporary view/table can only be dropped by #} |
| 121 | + {# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }} |
| 122 | + {% if target.type in ['databricks', 'sqlserver'] %} |
| 123 | + {{ dbtvault.drop_temporary_special(tmp_relation) }} |
| 124 | + {% else %} |
| 125 | + {% do to_drop.append(tmp_relation) %} |
| 126 | + {% endif %} |
| 127 | + |
| 128 | + {% do adapter.commit() %} |
| 129 | + |
| 130 | + {% endfor %} |
| 131 | + {% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%} |
| 132 | + {{ filtered_sql }} |
75 | 133 | {%- endcall %}
|
76 |
| - |
77 |
| - {{ adapter.expand_target_column_types(from_relation=tmp_relation, |
78 |
| - to_relation=target_relation) }} |
79 |
| - |
80 |
| - {%- set insert_query_name = 'main-' ~ i -%} |
81 |
| - {% call statement(insert_query_name, fetch_result=True) -%} |
82 |
| - INSERT INTO {{ target_relation }} ({{ target_cols_csv }}) |
83 |
| - ( |
84 |
| - SELECT {{ target_cols_csv }} |
85 |
| - FROM {{ tmp_relation.include(schema=True) }} |
86 |
| - ); |
87 |
| - {%- endcall %} |
88 |
| - |
89 |
| - {% set result = load_result(insert_query_name) %} |
90 |
| - {% if 'response' in result.keys() %} {# added in v0.19.0 #} |
91 |
| - {# Investigate for Databricks #} |
92 |
| - {%- if result['response']['rows_affected'] == None %} |
93 |
| - {% set rows_inserted = 0 %} |
94 |
| - {%- else %} |
95 |
| - {% set rows_inserted = result['response']['rows_affected'] %} |
96 |
| - {%- endif %} |
97 |
| - |
98 |
| - {% else %} {# older versions #} |
99 |
| - {% set rows_inserted = result['status'].split(" ")[2] | int %} |
100 |
| - {% endif %} |
101 |
| - |
102 |
| - {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} |
103 |
| - {%- do loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} |
104 |
| - |
105 |
| - {{ dbt_utils.log_info("Ran for {} {} of {}; {} records inserted [{}]".format('rank', iteration_number, |
106 |
| - min_max_ranks.max_rank, |
107 |
| - rows_inserted, |
108 |
| - model.unique_id)) }} |
109 |
| - |
110 |
| - {# In databricks and sqlserver a temporary view/table can only be dropped by #} |
111 |
| - {# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }} |
112 |
| - {% if target.type in ['databricks', 'sqlserver'] %} |
113 |
| - {{ dbtvault.drop_temporary_special(tmp_relation) }} |
114 |
| - {% else %} |
115 |
| - {% do to_drop.append(tmp_relation) %} |
116 |
| - {% endif %} |
117 |
| - |
118 |
| - {% do adapter.commit() %} |
119 |
| - |
120 |
| - {% endfor %} |
121 |
| - {% call noop_statement('main', "INSERT {}".format(loop_vars['sum_rows_inserted']) ) -%} |
122 |
| - {{ filtered_sql }} |
123 |
| - {%- endcall %} |
| 134 | + {% endif %} |
124 | 135 |
|
125 | 136 | {% endif %}
|
126 | 137 |
|
|
0 commit comments