Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

783 fix download authentication feeds #867

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 151 additions & 79 deletions functions-python/helpers/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,88 +65,160 @@ def test_download_and_get_hash(self):
if os.path.exists(file_path):
os.remove(file_path)

def test_download_and_get_hash_auth_type_1(self):
mock_binary_data = b"binary data for auth type 1"
expected_hash = hashlib.sha256(mock_binary_data).hexdigest()
file_path = "test_file.txt"
url = "https://test.com"
api_key_parameter_name = "Authorization"
credentials = "Bearer token123"

mock_response = MagicMock()
mock_response.read.side_effect = [mock_binary_data, b""]
mock_response.__enter__.return_value = mock_response

with patch(
"urllib3.PoolManager.request", return_value=mock_response
) as mock_request:
result_hash = download_and_get_hash(
url, file_path, "sha256", 8192, 1, api_key_parameter_name, credentials
)
# def test_download_and_get_hash_auth_type_1(self):
# mock_binary_data = b"binary data for auth type 1"
# expected_hash = hashlib.sha256(mock_binary_data).hexdigest()
# file_path = "test_file.txt"
# url = "https://test.com"
# api_key_parameter_name = "Authorization"
# credentials = "Bearer token123"

# mock_response = MagicMock()
# mock_response.read.side_effect = [mock_binary_data, b""]
# mock_response.__enter__.return_value = mock_response

# with patch(
# "urllib3.PoolManager.request", return_value=mock_response
# ) as mock_request:
# result_hash = download_and_get_hash(
# url, file_path, "sha256", 8192, 1, api_key_parameter_name, credentials
# )

# self.assertEqual(
# result_hash,
# expected_hash,
# msg=f"Hash mismatch: got {result_hash},"
# f" but expected {expected_hash}",
# )

# mock_request.assert_called_with(
# "GET",
# url,
# preload_content=False,
# headers={
# "User-Agent": expected_user_agent,
# api_key_parameter_name: credentials,
# },
# )

# if os.path.exists(file_path):
# os.remove(file_path)

# def test_download_and_get_hash_auth_type_2(self):
# mock_binary_data = b"binary data for auth type 2"
# expected_hash = hashlib.sha256(mock_binary_data).hexdigest()
# file_path = "test_file.txt"
# base_url = "https://test.com"
# api_key_parameter_name = "api_key"
# credentials = "key123"

# modified_url = f"{base_url}?{api_key_parameter_name}={credentials}"

# mock_response = MagicMock()
# mock_response.read.side_effect = [mock_binary_data, b""]
# mock_response.__enter__.return_value = mock_response

# with patch(
# "urllib3.PoolManager.request", return_value=mock_response
# ) as mock_request:
# result_hash = download_and_get_hash(
# base_url,
# file_path,
# "sha256",
# 8192,
# 2,
# api_key_parameter_name,
# credentials,
# )

# self.assertEqual(
# result_hash,
# expected_hash,
# msg=f"Hash mismatch: got {result_hash},"
# f" but expected {expected_hash}",
# )

# mock_request.assert_called_with(
# "GET",
# modified_url,
# preload_content=False,
# headers={"User-Agent": expected_user_agent},
# )

# if os.path.exists(file_path):
# os.remove(file_path)

self.assertEqual(
result_hash,
expected_hash,
msg=f"Hash mismatch: got {result_hash},"
f" but expected {expected_hash}",
)

mock_request.assert_called_with(
"GET",
url,
preload_content=False,
headers={
"User-Agent": expected_user_agent,
api_key_parameter_name: credentials,
},
)

if os.path.exists(file_path):
os.remove(file_path)

def test_download_and_get_hash_auth_type_2(self):
mock_binary_data = b"binary data for auth type 2"
expected_hash = hashlib.sha256(mock_binary_data).hexdigest()
file_path = "test_file.txt"
base_url = "https://test.com"
def test_download_and_get_hash_auth_type_1(self):
url = "http://example.com/data"
api_key_parameter_name = "api_key"
credentials = "key123"

modified_url = f"{base_url}?{api_key_parameter_name}={credentials}"

mock_response = MagicMock()
mock_response.read.side_effect = [mock_binary_data, b""]
mock_response.__enter__.return_value = mock_response

with patch(
"urllib3.PoolManager.request", return_value=mock_response
) as mock_request:
result_hash = download_and_get_hash(
base_url,
file_path,
"sha256",
8192,
2,
api_key_parameter_name,
credentials,
)
credentials = "test_api_key"
expected_url = f"{url}?{api_key_parameter_name}={credentials}"

with patch("helpers.download.create_urllib3_context") as mock_create_context:
mock_context = mock_create_context.return_value
mock_context.load_default_certs.return_value = None

with patch("helpers.download.urllib3.PoolManager") as mock_pool_manager:
mock_http = mock_pool_manager.return_value
mock_response = mock_http.request.return_value
mock_response.read.side_effect = [b"data_chunk_1", b"data_chunk_2", b""]
mock_response.__enter__.return_value = mock_response

result = download_and_get_hash(
url,
file_path="test_file",
hash_algorithm="sha256",
authentication_type=1,
api_key_parameter_name=api_key_parameter_name,
credentials=credentials,
)

mock_http.request.assert_called_with(
"GET",
expected_url,
preload_content=False,
headers={
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Mobile Safari/537.36"
},
)
assert result is not None

self.assertEqual(
result_hash,
expected_hash,
msg=f"Hash mismatch: got {result_hash},"
f" but expected {expected_hash}",
)

mock_request.assert_called_with(
"GET",
modified_url,
preload_content=False,
headers={"User-Agent": expected_user_agent},
)

if os.path.exists(file_path):
os.remove(file_path)
def test_download_and_get_hash_auth_type_2(self):
url = "http://example.com/data"
api_key_parameter_name = "Authorization"
credentials = "Bearer test_token"
expected_headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Mobile Safari/537.36",
api_key_parameter_name: credentials,
}

with patch("helpers.download.create_urllib3_context") as mock_create_context:
mock_context = mock_create_context.return_value
mock_context.load_default_certs.return_value = None

with patch("helpers.download.urllib3.PoolManager") as mock_pool_manager:
mock_http = mock_pool_manager.return_value
mock_response = mock_http.request.return_value
mock_response.read.side_effect = [b"data_chunk_1", b"data_chunk_2", b""]
mock_response.__enter__.return_value = mock_response

result = download_and_get_hash(
url,
file_path="test_file",
hash_algorithm="sha256",
authentication_type=2,
api_key_parameter_name=api_key_parameter_name,
credentials=credentials,
)

mock_http.request.assert_called_with(
"GET", url, preload_content=False, headers=expected_headers
)
assert result is not None

def test_download_and_get_hash_exception(self):
file_path = "test_file.txt"
Expand Down
8 changes: 4 additions & 4 deletions functions-python/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ def download_and_get_hash(
ctx.load_default_certs()
ctx.options |= 0x4 # ssl.OP_LEGACY_SERVER_CONNECT

# authentication_type == 1 -> the credentials are passed in the header
# authentication_type == 1 -> the credentials are passed in the url
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0.0.0 Mobile Safari/537.36"
}
if authentication_type == 1 and api_key_parameter_name and credentials:
headers[api_key_parameter_name] = credentials
url += f"?{api_key_parameter_name}={credentials}"

# authentication_type == 2 -> the credentials are passed in the url
# authentication_type == 2 -> the credentials are passed in the header
if authentication_type == 2 and api_key_parameter_name and credentials:
url += f"?{api_key_parameter_name}={credentials}"
headers[api_key_parameter_name] = credentials

with urllib3.PoolManager(ssl_context=ctx) as http:
with http.request(
Expand Down
Loading