Skip to content

Commit

Permalink
WIP: Fix Upload file parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
aliel committed Feb 27, 2024
1 parent e7877d5 commit 75c944c
Showing 1 changed file with 114 additions and 30 deletions.
144 changes: 114 additions & 30 deletions src/aleph/web/controllers/storage.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
import base64
import io
import logging
import tempfile
from decimal import Decimal
from typing import Union, Optional, Protocol
from typing import Optional, Protocol, Union, cast
from urllib.parse import parse_qsl

import aio_pika
import pydantic
from aiohttp import web
from aiohttp import hdrs, web
from aiohttp.multipart import BodyPartReader
from aiohttp.web_exceptions import HTTPRequestEntityTooLarge
from aiohttp.web_request import FileField
from aleph_message.models import ItemType, StoreContent
from mypy.dmypy_server import MiB
from pydantic import ValidationError

from aleph.chains.signature_verifier import SignatureVerifier
from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.cost import get_total_cost_for_address
from aleph.db.accessors.files import (
count_file_pins,
get_file,
)
from aleph.db.accessors.files import count_file_pins, get_file
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.schemas.pending_messages import (
BasePendingMessage,
PendingStoreMessage,
PendingInlineStoreMessage,
PendingStoreMessage,
)
from aleph.storage import StorageService
from aleph.types.db_session import DbSession
from aleph.types.message_status import (
InvalidSignature,
)
from aleph.utils import run_in_executor, item_type_from_hash, get_sha256
from aleph.types.message_status import InvalidSignature
from aleph.utils import get_sha256, item_type_from_hash, run_in_executor
from aleph.web.controllers.app_state_getters import (
get_session_factory_from_request,
get_storage_service_from_request,
get_config_from_request,
get_mq_channel_from_request,
get_session_factory_from_request,
get_signature_verifier_from_request,
get_storage_service_from_request,
)
from aleph.web.controllers.utils import (
mq_make_aleph_message_topic_queue,
add_grace_period_for_file,
broadcast_and_process_message,
broadcast_status_to_http_status,
add_grace_period_for_file,
mq_make_aleph_message_topic_queue,
)
from aleph_message.models import ItemType, StoreContent
from multidict import MultiDict, MultiDictProxy
from mypy.dmypy_server import MiB
from pydantic import ValidationError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -228,6 +228,98 @@ async def _make_mq_queue(
channel=mq_channel, config=config, routing_key=routing_key
)

async def parse_uploaded_file(request):
if request.content_type not in (
"",
"application/x-www-form-urlencoded",
"multipart/form-data",
):
raise web.HTTPUnprocessableEntity(reason="Invalid Content-Type header, expected application/x-www-form-urlencoded or multipart/form-data")

out: MultiDict[Union[str, bytes, UploadedFile]] = MultiDict()

if request.content_type == "multipart/form-data":
multipart = await request.multipart()
max_size = MAX_UPLOAD_FILE_SIZE

field = await multipart.next()
while field is not None:
if isinstance(field, FileField):
try:
content_length = int(request.headers.get("Content-Length", field.headers["Content-Length"]))
uploaded_file: UploadedFile = MultipartUploadedFile(field, content_length)
out.add(field.name, uploaded_file)
continue
except (KeyError, ValueError):
pass

size = 0
field_ct = field.headers.get(hdrs.CONTENT_TYPE)

if isinstance(field, BodyPartReader):
assert field.name is not None

# Note that according to RFC 7578, the Content-Type header
# is optional, even for files, so we can't assume it's
# present.
# https://tools.ietf.org/html/rfc7578#section-4.4
if field.filename:
# store file in temp file
tmp = tempfile.TemporaryFile()
chunk = await field.read_chunk(size=2**16)
while chunk:
chunk = field.decode(chunk)
tmp.write(chunk)
size += len(chunk)
if 0 < max_size < size:
tmp.close()
raise HTTPRequestEntityTooLarge(
max_size=max_size, actual_size=size
)
chunk = await field.read_chunk(size=2**16)
tmp.seek(0)

if field_ct is None:
field_ct = "application/octet-stream"

ff = FileField(
field.name,
field.filename,
cast(io.BufferedReader, tmp),
field_ct,
field.headers,
)
uploaded_file: UploadedFile = MultipartUploadedFile(ff, size)
out.add(field.name, uploaded_file)
else:
# deal with ordinary data
value = await field.read(decode=True)
size += len(value)

if 0 < max_size < size:
raise HTTPRequestEntityTooLarge(
max_size=max_size, actual_size=size
)
else:
raise ValueError(
"To decode nested multipart you need " "to use custom reader",
)

field = await multipart.next()
else:
data = await request.read()
if data:
charset = request.charset or "utf-8"
out.extend(
parse_qsl(
data.rstrip().decode(charset),
keep_blank_values=True,
encoding=charset,
)
)

request._post = MultiDictProxy(out)
return request._post

async def storage_add_file(request: web.Request):
storage_service = get_storage_service_from_request(request)
Expand All @@ -236,21 +328,13 @@ async def storage_add_file(request: web.Request):
config = get_config_from_request(request)
grace_period = config.storage.grace_period.value

post = await request.post()
headers = request.headers
#post = await request.post()
post = await parse_uploaded_file(request)
try:
file_field = post["file"]
uploaded_file = post["file"]
except KeyError:
raise web.HTTPUnprocessableEntity(reason="Missing 'file' in multipart form.")

if isinstance(file_field, FileField):
try:
content_length = int(headers.get("Content-Length", file_field.headers["Content-Length"]))
uploaded_file: UploadedFile = MultipartUploadedFile(file_field, content_length)
except (KeyError, ValueError):
raise web.HTTPUnprocessableEntity(reason="Invalid/missing Content-Length header.")
else:
uploaded_file = RawUploadedFile(file_field)

metadata = post.get("metadata")

Expand Down

0 comments on commit 75c944c

Please sign in to comment.