diff --git a/tornado/httputil.py b/tornado/httputil.py index b21d8046c4..06892a2f3e 100644 --- a/tornado/httputil.py +++ b/tornado/httputil.py @@ -849,6 +849,285 @@ def parse_multipart_form_data( arguments.setdefault(name, []).append(value) +_BOUNDARY_REGEX = re.compile(r'boundary="?(?P[^"]+)"?') +"""Regex to match the boundary option.""" + +_1MB = 1048576 +"""Number of bytes in 1 Megabyte.""" + + +class AbstractFileDelegate: + def start_file(self, name: str, headers: HTTPHeaders) -> Optional[Awaitable[None]]: + pass + + def file_data_received(self, name: str, data: bytes) -> Optional[Awaitable[None]]: + pass + + def finish_file(self, name: str) -> Optional[Awaitable[None]]: + pass + + +class ParserState: + + PARSE_BOUNDARY_LINE = 1 + """State that parses the initial boundary.""" + + PARSE_FILE_HEADERS = 2 + """State that parses the 'headers' for the next file/object.""" + + PARSE_BODY = 3 + """State that parses some body text.""" + + PARSING_DONE = 4 + """State that denotes the parser is finished.""" + + +class StreamingMultipartFormDataParser(object): + """Basic parser that accepts data and parses it into distinct files. + + This parser handles 'multipart/form-data' Content-Type uploads, which + permits multiple file uploads in a single request. + """ + + @classmethod + def from_content_type_header( + cls, delegate, header + ) -> "StreamingMultipartFormDataParser": + if isinstance(header, bytes): + header = header.decode("utf-8") + boundary = None + # Make sure the header is the multipart/form-data. + parts = [part.strip() for part in header.split(";")] + if parts[0].lower() != "multipart/form-data": + raise ValueError("Invalid Content-Type: {}".format(parts[0])) + + # Search for 'boundary=' + for part in parts: + m = _BOUNDARY_REGEX.match(part) + if m: + boundary = m.group("boundary") + return cls(delegate, boundary) + raise ValueError("Required 'boundary' option not found in header!") + + def __init__( + self, delegate: AbstractFileDelegate, boundary: str, max_buffer_size=_1MB + ): + """Create a StreamingMultipartFormDataParser. + + This parser (asynchronously) receives data, parses it, and invokes the + given delegate appropriately. + """ + # Be nice and decode the boundary if it is a bytes object. + if isinstance(boundary, bytes): + boundary = boundary.decode("utf-8") + # Store the delegate to write out the data. + self._delegate = delegate + self._boundary = boundary + self._max_buffer_size = max_buffer_size + self._name = None + + # Variables to store the current state of the parser. + self._state = ParserState.PARSE_BOUNDARY_LINE + self._buffer = bytearray() + + # Variables to hold the boundary matches. + self._boundary_next = "--{}\r\n".format(self._boundary).encode() + self._boundary_end = "--{}--\r\n".format(self._boundary).encode() + self._boundary_base = self._boundary_next[:-2] + + # Variables for caching boundary matching. + self._last_idx = 0 + self._boundary_idx = 0 + + @property + def boundary(self) -> str: + """Return the boundary text that denotes the end of a file.""" + return self._boundary + + def _change_state(self, state: ParserState, name: Optional[str] = None): + """Helper to change the state of the parser. + + This also clears some variables used in different states. + """ + self._state = state + self._last_idx = 0 + self._boundary_idx = 0 + self._name = name + + async def data_received(self, chunk: bytes) -> None: + # Process the data received, based on the current state. + # + # It is possible for 'chunk' here to be larger than the maximum buffer + # size. Initially, this is okay because we still need to process the + # chunk. However, when the buffer _remains_ this size after going + # through the rest of this call, then the input is bad since each state + # should incrementally consume data from the buffer contain its size. + if len(self._buffer) > self._max_buffer_size: + raise ValueError( + "Buffer is growing larger than: {} bytes!".format(self._buffer) + ) + # Ignore incrementing the buffer when in the DONE state altogether. + if self._state != ParserState.PARSING_DONE: + self._buffer.extend(chunk) + + # Iterate over and over while there is sufficient data in the buffer. + # Each loop should either consume data, or move to a state where not + # enough data is available, in which case this should exit to await + # more data. + while True: + # PARSE_BODY state --> Expecting to parse the file contents. + if self._state == ParserState.PARSE_BODY: + # Search for the boundary characters. + idx = self._buffer.find(b"-") + if idx < 0: + # No match against any boundary character. Write out the + # whole buffer. + data = self._buffer + self._buffer = bytearray() + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut + + # Return because the whole buffer was written out. + return + + # If 'idx > 0', write the data _up to_ this boundary point, + # then proceed in the same manner as 'idx == 0'. + if idx > 0: + # Write out all of the data, _up to_ this boundary point, + # then cycle around to check whether we are at the bounary + # or not. This simplifies the logic for checking against + # the boundary cases. + data = self._buffer[:idx] + self._buffer = self._buffer[idx:] + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut + + # Not enough data (technically) to check against. Wait for + # more data to be certain whether the boundary was parsed. + if len(self._buffer) < len(self._boundary_next): + return + + # If the buffer starts with the same contents as + # 'self._boundary_base', switch states and let that state + # handle this case more cleanly. + if self._buffer.startswith(self._boundary_next): + # Mark the current file as finished. + fut = self._delegate.finish_file(self._name) + if fut is not None: + await fut + self._change_state(ParserState.PARSE_BOUNDARY_LINE) + continue + + # Check the end boundary as well. The end boundary _might_ + # match if the 'self._boundary_base' matches, but the + # 'self._boundary_next' does not. Wait for more data if the + # buffer does not have enough data to be sure. + if len(self._buffer) < len(self._boundary_end): + return + + if self._buffer.startswith(self._boundary_end): + fut = self._delegate.finish_file(self._name) + if fut is not None: + await fut + self._change_state(ParserState.PARSE_BOUNDARY_LINE) + continue + + # No match so far, so write out the data up to the next + # boundary delimiter. + next_idx = self._buffer.find(b"-", 1) + if next_idx < 0: + data = self._buffer + self._buffer = bytearray() + else: + data = self._buffer[:next_idx] + self._buffer = self._buffer[next_idx:] + fut = self._delegate.file_data_received(self._name, data) + if fut is not None: + await fut + + # Continue and run the check after this update. + continue + + # PARSE_BOUNDARY_LINE state --> Expecting to parse either: + # - self._boundary_next (for the next file) + # - self._boundary_end (for the end of the request) + if self._state == ParserState.PARSE_BOUNDARY_LINE: + # Parse the first boundary chunk. + if len(self._buffer) < len(self._boundary_next): + # Not enough data, so exit. + return + # This implies we are parsing another file, so transition to + # the 'PARSE_HEADER' state. Also, continue to run through the + # loop again with the new state. + if self._buffer.startswith(self._boundary_next): + self._buffer = self._buffer[len(self._boundary_next) :] + self._change_state(ParserState.PARSE_FILE_HEADERS) + continue + # Check against 'self._boundary_end' as well. There is a slim + # chance that we are at the self._boundary_end case, but still + # do not have enough data, so handle that here. + if len(self._buffer) < len(self._boundary_end): + # Hope we get more data to confirm the boundary end case. + return + elif self._buffer.startswith(self._boundary_end): + # Done parsing. We should probably sanity-check that all + # data was consumed. + self._buffer = self._buffer[len(self._boundary_end) :] + self._change_state(ParserState.PARSING_DONE) + continue + else: + gen_log.warning("Invalid boundary parsed!") + + # PARSE_HEADERS state --> Expecting to parse headers with CRLF. + if self._state == ParserState.PARSE_FILE_HEADERS: + idx = self._buffer.find(b"\r\n\r\n", self._last_idx) + # Implies no match. Update the next index to search to be: + # max(0, len(buffer) - 3) + # as an optimization to speed up future comparisons. This + # should work; if there is no match, then the buffer could + # (in the worst case) have '\r\n\r', but not the final '\n' + # so we might need to rescan the previous 3 characters, but + # not 4. (Cap at 0 in case the buffer is too small for some + # reason.) + # + # In any case, there is not enough data, so just exit. + if idx < 0: + self._last_idx = max(0, len(self._buffer) - 3) + return + # Otherwise, we have a match. Parse this into a dictionary of + # headers and pass the result to create a new file. + data = self._buffer[: idx + 4].decode("utf-8") + self._buffer = self._buffer[idx + 4 :] + headers = HTTPHeaders.parse(data) + _, plist = _parse_header(headers.get("Content-Disposition", "")) + name = plist.get("name") + + # Call the delegate with the new file. + fut = self._delegate.start_file(name, headers=headers) + if fut is not None: + await fut + + # Update the buffer and the state. + self._change_state(ParserState.PARSE_BODY, name=name) + continue + + # PARSE_DONE state --> Expect no more data, but break the loop. + if self._state == ParserState.PARSING_DONE: + if len(self._buffer) > 0: + # WARNING: Data is left in the buffer when we should be + # finished... + gen_log.warning( + "Finished with non-empty buffer (%s bytes remaining).", + len(self._buffer), + ) + self._buffer.clear() + + # Even if there is data remaining, we should exit the loop. + return + + def format_timestamp( ts: Union[int, float, tuple, time.struct_time, datetime.datetime] ) -> str: diff --git a/tornado/test/httputil_test.py b/tornado/test/httputil_test.py index aa9b6ee253..261448924f 100644 --- a/tornado/test/httputil_test.py +++ b/tornado/test/httputil_test.py @@ -9,6 +9,8 @@ qs_to_qsl, HTTPInputError, HTTPFile, + StreamingMultipartFormDataParser, + AbstractFileDelegate, ) from tornado.escape import utf8, native_str from tornado.log import gen_log @@ -23,7 +25,7 @@ import urllib.parse import unittest -from typing import Tuple, Dict, List +from typing import Tuple, Dict, List, Optional def form_data_args() -> Tuple[Dict[str, List[bytes]], Dict[str, List[HTTPFile]]]: @@ -260,6 +262,190 @@ def test_data_after_final_boundary(self): self.assertEqual(file["body"], b"Foo") +MULTIPART_DATA = b"""----boundarything\r +Content-Disposition: form-data; name="a.txt"\r +\r +a----boundarything\r +Content-Disposition: form-data; name="b.csv"\r +Content-Type: text/csv\r +\r +col1,col2 +a,b +--boundarythin,thatwasclose +----boundarything--\r +""" + + +class MemoryFileDelegate(AbstractFileDelegate): + """Basic File Delegate that stores its contents in memory.""" + + def __init__(self): + super().__init__() + self._file_mapping = {} + + self._curr_file = None + self._buffer = bytearray() + self._headers = None + + @property + def keys(self): + return list(self._file_mapping.keys()) + + def start_file(self, name: str, headers: HTTPHeaders): + self._curr_file = name + self._headers = headers + self._buffer = bytearray() + + def file_data_received(self, name: str, data: bytes): + self._buffer.extend(data) + + def finish_file(self, name: str): + content_type = self._headers.get("Content-Type", "application/octet-stream") + httpfile = HTTPFile( + filename=name, body=bytes(self._buffer), content_type=content_type + ) + self._file_mapping[name] = httpfile + + def get_file(self, name) -> Optional[HTTPFile]: + return self._file_mapping.get(name) + + +class AsyncMemoryFileDelegate(AbstractFileDelegate): + """Basic File Delegate that stores its contents in memory.""" + + def __init__(self): + super().__init__() + self._file_mapping = {} + + self._curr_file = None + self._buffer = bytearray() + self._headers = None + + @property + def keys(self): + return list(self._file_mapping.keys()) + + async def start_file(self, name: str, headers: HTTPHeaders): + self._curr_file = name + self._headers = headers + self._buffer = bytearray() + + async def file_data_received(self, name: str, data: bytes): + self._buffer.extend(data) + + async def finish_file(self, name: str): + content_type = self._headers.get("Content-Type", "application/octet-stream") + httpfile = HTTPFile( + filename=name, body=bytes(self._buffer), content_type=content_type + ) + self._file_mapping[name] = httpfile + + def get_file(self, name) -> Optional[HTTPFile]: + return self._file_mapping.get(name) + + +class StreamingMultipartFormDataTest(unittest.IsolatedAsyncioTestCase): + + async def test_multipart_form_data(self): + boundary = b"--boundarything" + + # Test all possible splits and chunks of the given data. This will + # verify the parser with all (?) possible corner cases. + for i in range(len(MULTIPART_DATA)): + delegate = MemoryFileDelegate() + parser = StreamingMultipartFormDataParser(delegate, boundary) + chunk1 = MULTIPART_DATA[:i] + chunk2 = MULTIPART_DATA[i:] + await parser.data_received(chunk1) + await parser.data_received(chunk2) + + # Verify that the delegate contents are correct. + self.assertEqual( + set(["a.txt", "b.csv"]), + set(delegate.keys), + "Expected files not found for slicing at: {}".format(i), + ) + + http_file_a = delegate.get_file("a.txt") + self.assertIsNotNone(http_file_a) + self.assertEqual( + b"a", + http_file_a.body, + '"a.txt" file contents mismatch on slice: {}'.format(i), + ) + http_file_b = delegate.get_file("b.csv") + self.assertIsNotNone(http_file_b) + self.assertEqual( + b"col1,col2\na,b\n--boundarythin,thatwasclose\n", + http_file_b.body, + '"b.csv" file contents mismatch on slice: {}'.format(i), + ) + + async def test_multipart_form_data_async(self): + # Same test as above, but with async methods for the delegate. + boundary = b"--boundarything" + + # Test all possible splits and chunks of the given data. This will + # verify the parser with all possible corner cases. + for i in range(len(MULTIPART_DATA)): + delegate = AsyncMemoryFileDelegate() + parser = StreamingMultipartFormDataParser(delegate, boundary) + chunk1 = MULTIPART_DATA[:i] + chunk2 = MULTIPART_DATA[i:] + await parser.data_received(chunk1) + await parser.data_received(chunk2) + + # Verify that the delegate contents are correct. + self.assertEqual( + set(["a.txt", "b.csv"]), + set(delegate.keys), + "Expected files not found for slicing at: {}".format(i), + ) + # Assert that the file contents match what is expected. + http_file_a = delegate.get_file("a.txt") + self.assertIsNotNone(http_file_a) + self.assertEqual( + b"a", + http_file_a.body, + '"a.txt" file contents mismatch on slice: {}'.format(i), + ) + http_file_b = delegate.get_file("b.csv") + self.assertIsNotNone(http_file_b) + self.assertEqual( + b"col1,col2\na,b\n--boundarythin,thatwasclose\n", + http_file_b.body, + # bytes(delegate.parsed_data['b.csv']), + '"b.csv" file contents mismatch on slice: {}'.format(i), + ) + + async def test_runaway_memory_parser(self): + # Same test as above, but with async methods for the delegate. + boundary = b"--boundarything" + + # Parse the data. + BAD_DATA = b"""----boundarything\r +Content-Disposition: form-data; name="a.txt"\r +\r +a----boundarything\r +Content-Disposition: form-data; name="b.csv"\r +Content-Type: text/csv\r +""" + (b'a' * 10000) + b""" +\r +col1,col2 +a,b +--boundarythin,thatwasclose +----boundarything--\r +""" + delegate = AsyncMemoryFileDelegate() + # Configure the parser, but set the header size to something small. + # This will check if the parser raises after high memory consumption. + parser = StreamingMultipartFormDataParser( + delegate, boundary, max_buffer_size=1024) + await parser.data_received(BAD_DATA[:5000]) + with self.assertRaises(Exception): + await parser.data_received(BAD_DATA[5000:]) + + class HTTPHeadersTest(unittest.TestCase): def test_multi_line(self): # Lines beginning with whitespace are appended to the previous line