|
| 1 | +import binascii |
1 | 2 | import codecs
|
2 | 3 | import datetime
|
3 | 4 | from email.message import Message
|
4 | 5 | from typing import Any, Callable, Optional, TypeVar, Union
|
5 | 6 |
|
6 |
| -from tls_requests.exceptions import HTTPError |
| 7 | +from tls_requests.exceptions import Base64DecodeError, HTTPError |
7 | 8 | from tls_requests.models.cookies import Cookies
|
8 | 9 | from tls_requests.models.encoders import StreamEncoder
|
9 | 10 | from tls_requests.models.headers import Headers
|
|
12 | 13 | from tls_requests.models.tls import TLSResponse
|
13 | 14 | from tls_requests.settings import CHUNK_SIZE
|
14 | 15 | from tls_requests.types import CookieTypes, HeaderTypes, ResponseHistory
|
15 |
| -from tls_requests.utils import chardet, to_json |
| 16 | +from tls_requests.utils import b64decode, chardet, to_json |
16 | 17 |
|
17 | 18 | __all__ = ["Response"]
|
18 | 19 |
|
@@ -231,10 +232,21 @@ async def aclose(self) -> None:
|
231 | 232 | return self.close()
|
232 | 233 |
|
233 | 234 | @classmethod
|
234 |
| - def from_tls_response(cls, response: TLSResponse) -> "Response": |
| 235 | + def from_tls_response(cls, response: TLSResponse, is_byte_response: bool = False) -> "Response": |
| 236 | + def _parse_response_body(value: Optional[str]) -> bytes: |
| 237 | + if value: |
| 238 | + if is_byte_response: |
| 239 | + try: |
| 240 | + value = b64decode(value.split(",")[-1]) |
| 241 | + return value |
| 242 | + except (binascii.Error, AssertionError): |
| 243 | + raise Base64DecodeError("Couldn't decode the base64 string into bytes.") |
| 244 | + return value.encode("utf-8") |
| 245 | + return b"" |
| 246 | + |
235 | 247 | ret = cls(
|
236 | 248 | status_code=response.status,
|
237 |
| - body=response.body.encode("utf-8") if response.body else b"", |
| 249 | + body=_parse_response_body(response.body), |
238 | 250 | headers=response.headers,
|
239 | 251 | cookies=response.cookies,
|
240 | 252 | )
|
|
0 commit comments