From 75c944c3440830197cabb87ac582d4add5fc7a95 Mon Sep 17 00:00:00 2001 From: aliel Date: Tue, 27 Feb 2024 01:17:47 +0100 Subject: [PATCH] WIP: Fix Upload file parsing --- src/aleph/web/controllers/storage.py | 144 +++++++++++++++++++++------ 1 file changed, 114 insertions(+), 30 deletions(-) diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index b9b40f293..c05612682 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -1,48 +1,48 @@ import base64 +import io import logging +import tempfile from decimal import Decimal -from typing import Union, Optional, Protocol +from typing import Optional, Protocol, Union, cast +from urllib.parse import parse_qsl import aio_pika import pydantic -from aiohttp import web +from aiohttp import hdrs, web +from aiohttp.multipart import BodyPartReader +from aiohttp.web_exceptions import HTTPRequestEntityTooLarge from aiohttp.web_request import FileField -from aleph_message.models import ItemType, StoreContent -from mypy.dmypy_server import MiB -from pydantic import ValidationError - from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.balances import get_total_balance from aleph.db.accessors.cost import get_total_cost_for_address -from aleph.db.accessors.files import ( - count_file_pins, - get_file, -) +from aleph.db.accessors.files import count_file_pins, get_file from aleph.exceptions import AlephStorageException, UnknownHashError from aleph.schemas.pending_messages import ( BasePendingMessage, - PendingStoreMessage, PendingInlineStoreMessage, + PendingStoreMessage, ) from aleph.storage import StorageService from aleph.types.db_session import DbSession -from aleph.types.message_status import ( - InvalidSignature, -) -from aleph.utils import run_in_executor, item_type_from_hash, get_sha256 +from aleph.types.message_status import InvalidSignature +from aleph.utils import get_sha256, item_type_from_hash, run_in_executor from aleph.web.controllers.app_state_getters import ( - get_session_factory_from_request, - get_storage_service_from_request, get_config_from_request, get_mq_channel_from_request, + get_session_factory_from_request, get_signature_verifier_from_request, + get_storage_service_from_request, ) from aleph.web.controllers.utils import ( - mq_make_aleph_message_topic_queue, + add_grace_period_for_file, broadcast_and_process_message, broadcast_status_to_http_status, - add_grace_period_for_file, + mq_make_aleph_message_topic_queue, ) +from aleph_message.models import ItemType, StoreContent +from multidict import MultiDict, MultiDictProxy +from mypy.dmypy_server import MiB +from pydantic import ValidationError logger = logging.getLogger(__name__) @@ -228,6 +228,98 @@ async def _make_mq_queue( channel=mq_channel, config=config, routing_key=routing_key ) +async def parse_uploaded_file(request): + if request.content_type not in ( + "", + "application/x-www-form-urlencoded", + "multipart/form-data", + ): + raise web.HTTPUnprocessableEntity(reason="Invalid Content-Type header, expected application/x-www-form-urlencoded or multipart/form-data") + + out: MultiDict[Union[str, bytes, UploadedFile]] = MultiDict() + + if request.content_type == "multipart/form-data": + multipart = await request.multipart() + max_size = MAX_UPLOAD_FILE_SIZE + + field = await multipart.next() + while field is not None: + if isinstance(field, FileField): + try: + content_length = int(request.headers.get("Content-Length", field.headers["Content-Length"])) + uploaded_file: UploadedFile = MultipartUploadedFile(field, content_length) + out.add(field.name, uploaded_file) + continue + except (KeyError, ValueError): + pass + + size = 0 + field_ct = field.headers.get(hdrs.CONTENT_TYPE) + + if isinstance(field, BodyPartReader): + assert field.name is not None + + # Note that according to RFC 7578, the Content-Type header + # is optional, even for files, so we can't assume it's + # present. + # https://tools.ietf.org/html/rfc7578#section-4.4 + if field.filename: + # store file in temp file + tmp = tempfile.TemporaryFile() + chunk = await field.read_chunk(size=2**16) + while chunk: + chunk = field.decode(chunk) + tmp.write(chunk) + size += len(chunk) + if 0 < max_size < size: + tmp.close() + raise HTTPRequestEntityTooLarge( + max_size=max_size, actual_size=size + ) + chunk = await field.read_chunk(size=2**16) + tmp.seek(0) + + if field_ct is None: + field_ct = "application/octet-stream" + + ff = FileField( + field.name, + field.filename, + cast(io.BufferedReader, tmp), + field_ct, + field.headers, + ) + uploaded_file: UploadedFile = MultipartUploadedFile(ff, size) + out.add(field.name, uploaded_file) + else: + # deal with ordinary data + value = await field.read(decode=True) + size += len(value) + + if 0 < max_size < size: + raise HTTPRequestEntityTooLarge( + max_size=max_size, actual_size=size + ) + else: + raise ValueError( + "To decode nested multipart you need " "to use custom reader", + ) + + field = await multipart.next() + else: + data = await request.read() + if data: + charset = request.charset or "utf-8" + out.extend( + parse_qsl( + data.rstrip().decode(charset), + keep_blank_values=True, + encoding=charset, + ) + ) + + request._post = MultiDictProxy(out) + return request._post async def storage_add_file(request: web.Request): storage_service = get_storage_service_from_request(request) @@ -236,21 +328,13 @@ async def storage_add_file(request: web.Request): config = get_config_from_request(request) grace_period = config.storage.grace_period.value - post = await request.post() - headers = request.headers + #post = await request.post() + post = await parse_uploaded_file(request) try: - file_field = post["file"] + uploaded_file = post["file"] except KeyError: raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.") - if isinstance(file_field, FileField): - try: - content_length = int(headers.get("Content-Length", file_field.headers["Content-Length"])) - uploaded_file: UploadedFile = MultipartUploadedFile(file_field, content_length) - except (KeyError, ValueError): - raise web.HTTPUnprocessableEntity(reason="Invalid/missing Content-Length header.") - else: - uploaded_file = RawUploadedFile(file_field) metadata = post.get("metadata")