Skip to content

Commit 1e4d456

Browse files
SNOW-1778088 azure md5 (#2102)
1 parent 48fba63 commit 1e4d456

File tree

4 files changed

+72
-2
lines changed

4 files changed

+72
-2
lines changed

DESCRIPTION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
88

99
# Release Notes
1010

11+
- v3.12.4(TBD)
12+
- Fixed a bug where multipart uploads to Azure would be missing their MD5 hashes.
13+
1114
- v3.12.3(October 25,2024)
1215
- Improved the error message for SSL-related issues to provide clearer guidance when an SSL error occurs.
1316
- Improved error message for SQL execution cancellations caused by timeout.

src/snowflake/connector/azure_storage_client.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from __future__ import annotations
66

7+
import base64
78
import json
89
import os
910
import xml.etree.ElementTree as ET
@@ -17,6 +18,7 @@
1718
from .constants import FileHeader, ResultStatus
1819
from .encryption_util import EncryptionMetadata
1920
from .storage_client import SnowflakeStorageClient
21+
from .util_text import get_md5
2022
from .vendored import requests
2123

2224
if TYPE_CHECKING: # pragma: no cover
@@ -149,7 +151,7 @@ def get_file_header(self, filename: str) -> FileHeader | None:
149151
)
150152
)
151153
return FileHeader(
152-
digest=r.headers.get("x-ms-meta-sfcdigest"),
154+
digest=r.headers.get(SFCDIGEST),
153155
content_length=int(r.headers.get("Content-Length")),
154156
encryption_metadata=encryption_metadata,
155157
)
@@ -236,7 +238,27 @@ def _complete_multipart_upload(self) -> None:
236238
part = ET.Element("Latest")
237239
part.text = block_id
238240
root.append(part)
239-
headers = {"x-ms-blob-content-encoding": "utf-8"}
241+
# SNOW-1778088: We need to calculate the MD5 sum of this file for Azure Blob storage
242+
new_stream = not bool(self.meta.src_stream or self.meta.intermediate_stream)
243+
fd = (
244+
self.meta.src_stream
245+
or self.meta.intermediate_stream
246+
or open(self.meta.real_src_file_name, "rb")
247+
)
248+
try:
249+
if not new_stream:
250+
# Reset position in file
251+
fd.seek(0)
252+
file_content = fd.read()
253+
finally:
254+
if new_stream:
255+
fd.close()
256+
headers = {
257+
"x-ms-blob-content-encoding": "utf-8",
258+
"x-ms-blob-content-md5": base64.b64encode(get_md5(file_content)).decode(
259+
"utf-8"
260+
),
261+
}
240262
azure_metadata = self._prepare_file_metadata()
241263
headers.update(azure_metadata)
242264
retry_id = "COMPLETE"

src/snowflake/connector/util_text.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from __future__ import annotations
77

8+
import hashlib
89
import logging
910
import random
1011
import re
@@ -289,3 +290,11 @@ def random_string(
289290
"""
290291
random_part = "".join([random.Random().choice(choices) for _ in range(length)])
291292
return "".join([prefix, random_part, suffix])
293+
294+
295+
def get_md5(text: str | bytes) -> bytes:
296+
if isinstance(text, str):
297+
text = text.encode("utf-8")
298+
md5 = hashlib.md5()
299+
md5.update(text)
300+
return md5.digest()

test/integ/test_put_get.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,3 +791,39 @@ def test_get_multiple_files_with_same_name(tmp_path, conn_cnx, caplog):
791791
# This is expected flakiness
792792
pass
793793
assert "Downloading multiple files with the same name" in caplog.text
794+
795+
796+
@pytest.mark.skipolddriver
797+
def test_put_md5(tmp_path, conn_cnx):
798+
"""This test uploads a single and a multi part file and makes sure that md5 is populated."""
799+
# Generate random files and folders
800+
small_folder = tmp_path / "small"
801+
big_folder = tmp_path / "big"
802+
small_folder.mkdir()
803+
big_folder.mkdir()
804+
generate_k_lines_of_n_files(3, 1, tmp_dir=str(small_folder))
805+
# This generate an about 342M file, we want the file big enough to trigger a multipart upload
806+
generate_k_lines_of_n_files(3_000_000, 1, tmp_dir=str(big_folder))
807+
808+
small_test_file = small_folder / "file0"
809+
big_test_file = big_folder / "file0"
810+
811+
stage_name = random_string(5, "test_put_md5_")
812+
with conn_cnx() as cnx:
813+
with cnx.cursor() as cur:
814+
cur.execute(f"create temporary stage {stage_name}")
815+
small_filename_in_put = str(small_test_file).replace("\\", "/")
816+
big_filename_in_put = str(big_test_file).replace("\\", "/")
817+
cur.execute(
818+
f"PUT 'file://{small_filename_in_put}' @{stage_name}/small AUTO_COMPRESS = FALSE"
819+
)
820+
cur.execute(
821+
f"PUT 'file://{big_filename_in_put}' @{stage_name}/big AUTO_COMPRESS = FALSE"
822+
)
823+
824+
assert all(
825+
map(
826+
lambda e: e[2] is not None,
827+
cur.execute(f"LS @{stage_name}").fetchall(),
828+
)
829+
)

0 commit comments

Comments
 (0)