KCSB alignment #411
GitHub Actions / Test Results
failed
Jan 5, 2025 in 0s
2 fail, 35 skipped, 275 pass in 18m 14s
Annotations
Check warning on line 0 in azure-kusto-ingest.tests.test_e2e_ingest.TestE2E
github-actions / Test Results
All 5 runs failed: test_streaming_ingest_from_blob[ManagedStreaming] (azure-kusto-ingest.tests.test_e2e_ingest.TestE2E)
artifacts/Unit Test Results (Python 3.10)/pytest.xml [took 3s]
artifacts/Unit Test Results (Python 3.11)/pytest.xml [took 2s]
artifacts/Unit Test Results (Python 3.12)/pytest.xml [took 2s]
artifacts/Unit Test Results (Python 3.8)/pytest.xml [took 2s]
artifacts/Unit Test Results (Python 3.9)/pytest.xml [took 2s]
Raw output
azure.kusto.data.exceptions.KustoApiError: Bad streaming ingestion request to fastbatchinge2e.python_test_3_10_15_1736065161_57236 : Access to persistent storage path 'https://hnykstrldsdkse2etest01.blob.core.windows.net/20250105-ingestdata-e5c334ee145d4b4-0/fastbatchinge2e__python_test_3_10_15_1736065161_57236__ca84d9f8-e869-44d1-9d22-5332f17f8a85__dataset.json' was denied (operation 'BlobPersistentStorageFile.GetProperties')
self = <azure.kusto.data.client.KustoClient object at 0x7f6864ad0b50>
endpoint = 'https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri'
request = <azure.kusto.data.client_base.ExecuteRequestParams object at 0x7f6864e6d210>
properties = <azure.kusto.data.client_request_properties.ClientRequestProperties object at 0x7f6864e6dcc0>
stream_response = False
def _execute(
self,
endpoint: str,
request: ExecuteRequestParams,
properties: Optional[ClientRequestProperties] = None,
stream_response: bool = False,
) -> Union[KustoResponseDataSet, Response]:
"""Executes given query against this client"""
if self._is_closed:
raise KustoClosedError()
self.validate_endpoint()
request_headers = request.request_headers
if self._aad_helper:
request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()
# trace http post call for response
invoker = lambda: self._session.post(
endpoint,
headers=request_headers,
json=request.json_payload,
data=request.payload,
timeout=request.timeout.seconds,
stream=stream_response,
allow_redirects=False,
)
try:
response = MonitoredActivity.invoke(
invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
)
except Exception as e:
raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e
if stream_response:
try:
response.raise_for_status()
if 300 <= response.status_code < 400:
raise Exception("Unexpected redirection, got status code: " + str(response.status))
return response
except Exception as e:
raise self._handle_http_error(e, self._query_endpoint, None, response, response.status_code, response.json(), response.text)
response_json = None
try:
if 300 <= response.status_code < 400:
raise Exception("Unexpected redirection, got status code: " + str(response.status))
if response.text:
response_json = response.json()
else:
raise KustoServiceError("The content of the response contains no data.", response)
> response.raise_for_status()
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:360:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Response [400]>
def raise_for_status(self):
"""Raises :class:`HTTPError`, if one occurred."""
http_error_msg = ""
if isinstance(self.reason, bytes):
# We attempt to decode utf-8 first because some servers
# choose to localize their reason strings. If the string
# isn't utf-8, we fall back to iso-8859-1 for all other
# encodings. (See PR #3538)
try:
reason = self.reason.decode("utf-8")
except UnicodeDecodeError:
reason = self.reason.decode("iso-8859-1")
else:
reason = self.reason
if 400 <= self.status_code < 500:
http_error_msg = (
f"{self.status_code} Client Error: {reason} for url: {self.url}"
)
elif 500 <= self.status_code < 600:
http_error_msg = (
f"{self.status_code} Server Error: {reason} for url: {self.url}"
)
if http_error_msg:
> raise HTTPError(http_error_msg, response=self)
E requests.exceptions.HTTPError: 400 Client Error: BadRequest for url: https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri
../../../.local/lib/python3.10/site-packages/requests/models.py:1024: HTTPError
The above exception was the direct cause of the following exception:
self = <test_e2e_ingest.TestE2E object at 0x7f68664c6bc0>
is_managed_streaming = True
@pytest.mark.asyncio
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
ingestion_properties = IngestionProperties(
database=self.test_db,
table=self.test_table,
data_format=DataFormat.JSON,
ingestion_mapping_reference="JsonMapping",
ingestion_mapping_kind=IngestionMappingKind.JSON,
)
containers = self.ingest_client._resource_manager.get_containers()
with FileDescriptor(self.json_file_path).open(False) as stream:
blob_descriptor = self.ingest_client.upload_blob(
containers,
FileDescriptor(self.json_file_path),
ingestion_properties.database,
ingestion_properties.table,
stream,
None,
10 * 60,
3,
)
if is_managed_streaming:
> self.managed_streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties)
azure-kusto-ingest/tests/test_e2e_ingest.py:563:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.local/lib/python3.10/site-packages/azure/core/tracing/decorator.py:105: in wrapper_use_tracer
return func(*args, **kwargs)
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/managed_streaming_ingest_client.py:123: in ingest_from_blob
res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties)
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/managed_streaming_ingest_client.py:142: in _stream_with_retries
for attempt in Retrying(stop=stop_after_attempt(self._num_of_attempts), wait=wait_random_exponential(max=self._max_seconds_per_retry), reraise=True):
../../../.local/lib/python3.10/site-packages/tenacity/__init__.py:443: in __iter__
do = self.iter(retry_state=retry_state)
../../../.local/lib/python3.10/site-packages/tenacity/__init__.py:376: in iter
result = action(retry_state)
../../../.local/lib/python3.10/site-packages/tenacity/__init__.py:418: in exc_check
raise retry_exc.reraise()
../../../.local/lib/python3.10/site-packages/tenacity/__init__.py:185: in reraise
raise self.last_attempt.result()
/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/concurrent/futures/_base.py:451: in result
return self.__get_result()
/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
raise self._exception
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/managed_streaming_ingest_client.py:151: in _stream_with_retries
return MonitoredActivity.invoke(
../../../.local/lib/python3.10/site-packages/azure/kusto/data/_telemetry.py:116: in invoke
return span()
../../../.local/lib/python3.10/site-packages/azure/core/tracing/decorator.py:105: in wrapper_use_tracer
return func(*args, **kwargs)
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/managed_streaming_ingest_client.py:150: in <lambda>
invoker = lambda: self.streaming_client.ingest_from_blob(descriptor, props, client_request_id)
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/streaming_ingest_client.py:105: in ingest_from_blob
self._kusto_client.execute_streaming_ingest(
../../../.local/lib/python3.10/site-packages/azure/core/tracing/decorator.py:105: in wrapper_use_tracer
return func(*args, **kwargs)
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:271: in execute_streaming_ingest
self._execute(endpoint, request, properties)
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:362: in _execute
raise self._handle_http_error(e, endpoint, request.payload, response, response.status_code, response_json, response.text)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
exception = HTTPError('400 Client Error: BadRequest for url: https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri')
endpoint = 'https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri'
payload = None, response = <Response [400]>, status = 400
response_json = {'error': {'@context': {'activityId': '245e9398-a38c-4d58-87e5-f7c39aa3b88e', 'activityStack': '(Activity stack: CRID=...le.GetProperties')", '@permanent': True, '@type': 'Kusto.DataNode.Exceptions.StreamingIngestionRequestException', ...}}
response_text = '{\r\n "error": {\r\n "code": "BadRequest",\r\n "message": "Request is invalid and cannot be executed...> GW.Http.CallContext/245e9398-a38c-4d58-87e5-f7c39aa3b88e)"\r\n },\r\n "@permanent": true\r\n }\r\n}'
@staticmethod
def _handle_http_error(
exception: Exception,
endpoint: Optional[str],
payload: Optional[io.IOBase],
response: "Union[Response, aiohttp.ClientResponse]",
status: int,
response_json: Any,
response_text: Optional[str],
) -> NoReturn:
if status == 404:
if payload:
raise KustoServiceError("The ingestion endpoint does not exist. Please enable streaming ingestion on your cluster.", response) from exception
raise KustoServiceError(f"The requested endpoint '{endpoint}' does not exist.", response) from exception
if status == 429:
raise KustoThrottlingError("The request was throttled by the server.", response) from exception
if status == 401:
raise KustoServiceError(f"401. Missing adequate access rights.", response) from exception
if payload:
message = f"An error occurred while trying to ingest: Status: {status}, Reason: {response.reason}, Text: {response_text}."
if response_json:
raise KustoApiError(response_json, message, response) from exception
raise KustoServiceError(message, response) from exception
if response_json:
> raise KustoApiError(response_json, http_response=response) from exception
E azure.kusto.data.exceptions.KustoApiError: Bad streaming ingestion request to fastbatchinge2e.python_test_3_10_15_1736065161_57236 : Access to persistent storage path 'https://hnykstrldsdkse2etest01.blob.core.windows.net/20250105-ingestdata-e5c334ee145d4b4-0/fastbatchinge2e__python_test_3_10_15_1736065161_57236__ca84d9f8-e869-44d1-9d22-5332f17f8a85__dataset.json' was denied (operation 'BlobPersistentStorageFile.GetProperties')
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client_base.py:122: KustoApiError
Check warning on line 0 in azure-kusto-ingest.tests.test_e2e_ingest.TestE2E
github-actions / Test Results
All 5 runs failed: test_streaming_ingest_from_blob[NormalClient] (azure-kusto-ingest.tests.test_e2e_ingest.TestE2E)
artifacts/Unit Test Results (Python 3.10)/pytest.xml [took 1m 25s]
artifacts/Unit Test Results (Python 3.11)/pytest.xml [took 1m 52s]
artifacts/Unit Test Results (Python 3.12)/pytest.xml [took 1m 28s]
artifacts/Unit Test Results (Python 3.8)/pytest.xml [took 0s]
artifacts/Unit Test Results (Python 3.9)/pytest.xml [took 0s]
Raw output
azure.kusto.data.exceptions.KustoApiError: Bad streaming ingestion request to fastbatchinge2e.python_test_3_10_15_1736065161_57236 : Access to persistent storage path 'https://hnykstrldsdkse2etest01.blob.core.windows.net/20250105-ingestdata-e5c334ee145d4b4-0/fastbatchinge2e__python_test_3_10_15_1736065161_57236__ab02d50f-c4ed-41af-ad56-c01e1faad89d__dataset.json' was denied (operation 'BlobPersistentStorageFile.GetProperties')
self = <azure.kusto.data.client.KustoClient object at 0x7f6864abc7f0>
endpoint = 'https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri'
request = <azure.kusto.data.client_base.ExecuteRequestParams object at 0x7f6864b54310>
properties = None, stream_response = False
def _execute(
self,
endpoint: str,
request: ExecuteRequestParams,
properties: Optional[ClientRequestProperties] = None,
stream_response: bool = False,
) -> Union[KustoResponseDataSet, Response]:
"""Executes given query against this client"""
if self._is_closed:
raise KustoClosedError()
self.validate_endpoint()
request_headers = request.request_headers
if self._aad_helper:
request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()
# trace http post call for response
invoker = lambda: self._session.post(
endpoint,
headers=request_headers,
json=request.json_payload,
data=request.payload,
timeout=request.timeout.seconds,
stream=stream_response,
allow_redirects=False,
)
try:
response = MonitoredActivity.invoke(
invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
)
except Exception as e:
raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e
if stream_response:
try:
response.raise_for_status()
if 300 <= response.status_code < 400:
raise Exception("Unexpected redirection, got status code: " + str(response.status))
return response
except Exception as e:
raise self._handle_http_error(e, self._query_endpoint, None, response, response.status_code, response.json(), response.text)
response_json = None
try:
if 300 <= response.status_code < 400:
raise Exception("Unexpected redirection, got status code: " + str(response.status))
if response.text:
response_json = response.json()
else:
raise KustoServiceError("The content of the response contains no data.", response)
> response.raise_for_status()
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:360:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Response [400]>
def raise_for_status(self):
"""Raises :class:`HTTPError`, if one occurred."""
http_error_msg = ""
if isinstance(self.reason, bytes):
# We attempt to decode utf-8 first because some servers
# choose to localize their reason strings. If the string
# isn't utf-8, we fall back to iso-8859-1 for all other
# encodings. (See PR #3538)
try:
reason = self.reason.decode("utf-8")
except UnicodeDecodeError:
reason = self.reason.decode("iso-8859-1")
else:
reason = self.reason
if 400 <= self.status_code < 500:
http_error_msg = (
f"{self.status_code} Client Error: {reason} for url: {self.url}"
)
elif 500 <= self.status_code < 600:
http_error_msg = (
f"{self.status_code} Server Error: {reason} for url: {self.url}"
)
if http_error_msg:
> raise HTTPError(http_error_msg, response=self)
E requests.exceptions.HTTPError: 400 Client Error: BadRequest for url: https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri
../../../.local/lib/python3.10/site-packages/requests/models.py:1024: HTTPError
The above exception was the direct cause of the following exception:
self = <test_e2e_ingest.TestE2E object at 0x7f68664c7a90>
is_managed_streaming = False
@pytest.mark.asyncio
async def test_streaming_ingest_from_blob(self, is_managed_streaming):
ingestion_properties = IngestionProperties(
database=self.test_db,
table=self.test_table,
data_format=DataFormat.JSON,
ingestion_mapping_reference="JsonMapping",
ingestion_mapping_kind=IngestionMappingKind.JSON,
)
containers = self.ingest_client._resource_manager.get_containers()
with FileDescriptor(self.json_file_path).open(False) as stream:
blob_descriptor = self.ingest_client.upload_blob(
containers,
FileDescriptor(self.json_file_path),
ingestion_properties.database,
ingestion_properties.table,
stream,
None,
10 * 60,
3,
)
if is_managed_streaming:
self.managed_streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties)
else:
> self.streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties)
azure-kusto-ingest/tests/test_e2e_ingest.py:565:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.local/lib/python3.10/site-packages/azure/kusto/ingest/streaming_ingest_client.py:105: in ingest_from_blob
self._kusto_client.execute_streaming_ingest(
../../../.local/lib/python3.10/site-packages/azure/core/tracing/decorator.py:105: in wrapper_use_tracer
return func(*args, **kwargs)
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:271: in execute_streaming_ingest
self._execute(endpoint, request, properties)
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client.py:362: in _execute
raise self._handle_http_error(e, endpoint, request.payload, response, response.status_code, response_json, response.text)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
exception = HTTPError('400 Client Error: BadRequest for url: https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri')
endpoint = 'https://sdkse2etest.eastus.kusto.windows.net/v1/rest/ingest/fastbatchinge2e/python_test_3_10_15_1736065161_57236?streamFormat=json&mappingName=JsonMapping&sourceKind=uri'
payload = None, response = <Response [400]>, status = 400
response_json = {'error': {'@context': {'activityId': '55e2a63f-cbda-49f3-be5d-f25ad35cacd8', 'activityStack': '(Activity stack: CRID=...le.GetProperties')", '@permanent': True, '@type': 'Kusto.DataNode.Exceptions.StreamingIngestionRequestException', ...}}
response_text = '{\r\n "error": {\r\n "code": "BadRequest",\r\n "message": "Request is invalid and cannot be executed...> GW.Http.CallContext/55e2a63f-cbda-49f3-be5d-f25ad35cacd8)"\r\n },\r\n "@permanent": true\r\n }\r\n}'
@staticmethod
def _handle_http_error(
exception: Exception,
endpoint: Optional[str],
payload: Optional[io.IOBase],
response: "Union[Response, aiohttp.ClientResponse]",
status: int,
response_json: Any,
response_text: Optional[str],
) -> NoReturn:
if status == 404:
if payload:
raise KustoServiceError("The ingestion endpoint does not exist. Please enable streaming ingestion on your cluster.", response) from exception
raise KustoServiceError(f"The requested endpoint '{endpoint}' does not exist.", response) from exception
if status == 429:
raise KustoThrottlingError("The request was throttled by the server.", response) from exception
if status == 401:
raise KustoServiceError(f"401. Missing adequate access rights.", response) from exception
if payload:
message = f"An error occurred while trying to ingest: Status: {status}, Reason: {response.reason}, Text: {response_text}."
if response_json:
raise KustoApiError(response_json, message, response) from exception
raise KustoServiceError(message, response) from exception
if response_json:
> raise KustoApiError(response_json, http_response=response) from exception
E azure.kusto.data.exceptions.KustoApiError: Bad streaming ingestion request to fastbatchinge2e.python_test_3_10_15_1736065161_57236 : Access to persistent storage path 'https://hnykstrldsdkse2etest01.blob.core.windows.net/20250105-ingestdata-e5c334ee145d4b4-0/fastbatchinge2e__python_test_3_10_15_1736065161_57236__ab02d50f-c4ed-41af-ad56-c01e1faad89d__dataset.json' was denied (operation 'BlobPersistentStorageFile.GetProperties')
../../../.local/lib/python3.10/site-packages/azure/kusto/data/client_base.py:122: KustoApiError
Loading