-
-
Notifications
You must be signed in to change notification settings - Fork 29
Open
Description
In my project I needed to mock COPY INTO statements so that queries could produce Parquet files in a local stage path. To achieve this, I extended fakesnow.cursor.FakeSnowflakeCursor and overrode execute as follows:
def execute(self, command: str, params: Sequence[Any] | dict[Any, Any] | None = None) -> Self:
if self._stage_path is not None:
formatted_command = command % params if params else command
parsed_command = sqlglot.parse_one(formatted_command, read='snowflake')
if isinstance(parsed_command, sqlglot.expressions.Copy):
inner_sql = parsed_command.args['files'][0].this
self._cursor.execute(inner_sql.sql(dialect='snowflake'), params)
self._write_parquet(parsed_command, self._stage_path)
self._results = []
return self
self._cursor.execute(command, params)
self._results = self._cursor.fetchall()
return selfThe _write_parquet helper inspects the COPY INTO parameters and writes the results into a Parquet file. Only Parquet is supported today, but it could be expanded in a similar way to handle other formats:
def _write_parquet(
self, copy_command: sqlglot.expressions.Copy, stage_path: pathlib.Path
) -> None:
stage_target = str(copy_command.this.this.this).lstrip('@')
file_type = 'parquet'
compression: _PARQUET_COMPRESSION_TYPES = 'snappy'
for param in copy_command.args.get('params', []):
if str(param.this.this).upper() == 'FILE_FORMAT':
for expr in param.args.get('expressions', []):
prop_name = str(expr.this.this).upper()
prop_value = str(expr.args['value'].this).strip('"\'')
if prop_name == 'TYPE':
file_type = prop_value.lower()
elif prop_name == 'COMPRESSION':
compression = typing.cast(_PARQUET_COMPRESSION_TYPES, prop_value.lower())
if file_type != 'parquet':
raise ValueError(f'Only parquet is supported by the mock. Got TYPE={file_type!r}')
columns = (
[d[0] for d in self._cursor.description] if self._cursor.description else ['_COL_0']
)
fetched_rows = self._cursor.fetchall()
if fetched_rows:
if self._cursor._use_dict_result:
rows = [
tuple(r.get(c) for c in columns)
for r in typing.cast(Sequence[dict[str, Any]], fetched_rows)
]
else:
rows = [tuple(r) for r in fetched_rows]
table = pyarrow.Table.from_arrays(
[pyarrow.array(column) for column in list(zip(*rows, strict=False))],
names=columns,
)
else:
table = pyarrow.table({c: pyarrow.array([], type=pyarrow.null()) for c in columns})
file_path = stage_path.joinpath(stage_target).with_suffix(f'.{file_type}')
file_path.parent.mkdir(parents=True, exist_ok=True)
pyarrow.parquet.write_table(table, file_path, compression=compression)This approach makes it possible to mock COPY INTO behavior realistically during tests, producing actual Parquet files in a temporary stage directory.
Would you be open to a PR that adds this functionality (or something along these lines) directly into FakeSnowflakeCursor?
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels