Skip to content

Commit

Permalink
Merge pull request #86 from openstreetmap-polska/dev
Browse files Browse the repository at this point in the history
Release to main
  • Loading branch information
Zaczero authored Nov 6, 2024
2 parents fb205ba + 48f8f29 commit 10af27c
Show file tree
Hide file tree
Showing 18 changed files with 216 additions and 782 deletions.
17 changes: 10 additions & 7 deletions api/v1/photos.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import timedelta
from io import BytesIO
from typing import Annotated
Expand All @@ -17,21 +18,23 @@
from services.aed_service import AEDService
from services.photo_report_service import PhotoReportService
from services.photo_service import PhotoService
from utils import JSON_DECODE, get_wikimedia_commons_url, http_get
from utils import HTTP, get_wikimedia_commons_url

router = APIRouter(prefix='/photos')


async def _fetch_image(url: str) -> tuple[bytes, str]:
# NOTE: ideally we would verify whether url is not a private resource
async with http_get(url, allow_redirects=True, raise_for_status=True) as r:
async with HTTP.stream('GET', url) as r:
r.raise_for_status()

# Early detection of unsupported types
content_type = r.headers.get('Content-Type')
if content_type and content_type not in IMAGE_CONTENT_TYPES:
raise HTTPException(500, f'Unsupported file type {content_type!r}, must be one of {IMAGE_CONTENT_TYPES}')

with BytesIO() as buffer:
async for chunk, _ in r.content.iter_chunks():
async for chunk in r.aiter_bytes(1024 * 1024):
buffer.write(chunk)
if buffer.tell() > IMAGE_REMOTE_MAX_FILE_SIZE:
raise HTTPException(
Expand Down Expand Up @@ -69,10 +72,10 @@ async def proxy_direct(url_encoded: str):
@cache_control(timedelta(days=7), stale=timedelta(days=7))
async def proxy_wikimedia_commons(path_encoded: str):
meta_url = get_wikimedia_commons_url(unquote_plus(path_encoded))
async with http_get(meta_url, allow_redirects=True, raise_for_status=True) as r:
html = await r.text()
r = await HTTP.get(meta_url)
r.raise_for_status()

bs = BeautifulSoup(html, 'lxml')
bs = BeautifulSoup(r.text, 'lxml')
og_image = bs.find('meta', property='og:image')
if not isinstance(og_image, Tag):
return Response('Missing og:image meta tag', 404)
Expand Down Expand Up @@ -106,7 +109,7 @@ async def upload(
return Response(f'Unsupported file type {content_type!r}, must be one of {IMAGE_CONTENT_TYPES}', 400)

try:
oauth2_credentials_: dict = JSON_DECODE(oauth2_credentials)
oauth2_credentials_: dict = json.loads(oauth2_credentials)
oauth2_token = SecretStr(oauth2_credentials_['access_token'])
except Exception:
return Response('OAuth2 credentials must be a JSON object', 400)
Expand Down
3 changes: 2 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyproj import Transformer

NAME = 'openaedmap-backend'
VERSION = '2.12.0'
VERSION = '2.13.0'
CREATED_BY = f'{NAME} {VERSION}'
WEBSITE = 'https://openaedmap.org'

Expand Down Expand Up @@ -91,6 +91,7 @@
'httpx',
'httpcore',
'multipart',
'python_multipart',
'PIL',
)
},
Expand Down
5 changes: 5 additions & 0 deletions config/postgres.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ unix_socket_directories = '/tmp/openaedmap-postgres'
shared_buffers = 512MB
effective_cache_size = 1GB

# disable parallel gather:
# introduces noticeable overhead and is never useful
# we only perform relatively small queries and rely heavily on indexes
max_parallel_workers_per_gather = 0

# increase statistics target
# reason: more accurate query plans
default_statistics_target = 200
Expand Down
9 changes: 3 additions & 6 deletions db.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from contextlib import asynccontextmanager

from redis.asyncio import ConnectionPool, Redis
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from valkey.asyncio import ConnectionPool, Valkey

from config import POSTGRES_URL, VALKEY_URL
from utils import JSON_DECODE, JSON_ENCODE

_db_engine = create_async_engine(
POSTGRES_URL,
json_deserializer=JSON_DECODE,
json_serializer=lambda x: JSON_ENCODE(x).decode(),
query_cache_size=128,
pool_size=10,
max_overflow=-1,
Expand Down Expand Up @@ -43,10 +40,10 @@ async def db_write():
await session.commit()


_redis_pool = ConnectionPool().from_url(VALKEY_URL)
_valkey_pool = ConnectionPool().from_url(VALKEY_URL)


@asynccontextmanager
async def valkey():
async with Redis(connection_pool=_redis_pool) as r:
async with Valkey(connection_pool=_valkey_pool) as r:
yield r
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let
mkdir -p $out/bin $out/lib
find "${./.venv/bin}" -type f -executable -exec cp {} $out/bin \;
sed -i '1s|^#!.*/python|#!/usr/bin/env python|' $out/bin/*
cp -r "${./.venv/lib/python3.12/site-packages}"/* $out/lib
cp -r "${./.venv/lib/python3.13/site-packages}"/* $out/lib
'')
];
pathsToLink = [ "/bin" "/lib" ];
Expand Down
10 changes: 1 addition & 9 deletions json_response.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
from typing import override

from fastapi.responses import JSONResponse

from utils import JSON_ENCODE


class CustomJSONResponse(JSONResponse):
class JSONResponseUTF8(JSONResponse):
media_type = 'application/json; charset=utf-8'

@override
def render(self, content) -> bytes:
return JSON_ENCODE(content)
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fastapi.middleware.cors import CORSMiddleware
from starlette_compress import CompressMiddleware

from json_response import CustomJSONResponse
from json_response import JSONResponseUTF8
from middlewares.cache_control_middleware import CacheControlMiddleware
from middlewares.cache_response_middleware import CacheResponseMiddleware
from middlewares.profiler_middleware import ProfilerMiddleware
Expand Down Expand Up @@ -38,7 +38,7 @@ async def lifespan(_):
yield


app = FastAPI(lifespan=lifespan, default_response_class=CustomJSONResponse)
app = FastAPI(lifespan=lifespan, default_response_class=JSONResponseUTF8)
app.add_middleware(CacheControlMiddleware)
app.add_middleware(CacheResponseMiddleware)
app.add_middleware(
Expand Down
4 changes: 2 additions & 2 deletions middlewares/cache_response_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def satisfy_response_start(self, message: Message) -> None:

@trace
async def _get_cached_response(url: URL) -> CachedResponse | None:
key = f'cache:{url.path}:{url.query}'
key = f'cache2:{url.path}:{url.query}'

async with valkey() as conn:
value: bytes | None = await conn.get(key)
Expand All @@ -168,7 +168,7 @@ async def _get_cached_response(url: URL) -> CachedResponse | None:

@trace
async def _set_cached_response(url: URL, cached: CachedResponse) -> None:
key = f'cache:{url.path}:{url.query}'
key = f'cache2:{url.path}:{url.query}'
value = _compress(cached.to_bytes())
ttl = int((cached.max_age + cached.stale).total_seconds())

Expand Down
4 changes: 2 additions & 2 deletions middlewares/skip_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from fastapi import Response

from json_response import CustomJSONResponse
from json_response import JSONResponseUTF8


def skip_serialization(headers: Mapping[str, str] | None = None):
Expand All @@ -13,7 +13,7 @@ async def wrapper(*args, **kwargs):
raw_response = await func(*args, **kwargs)
if isinstance(raw_response, Response):
return raw_response
return CustomJSONResponse(raw_response, headers=headers)
return JSONResponseUTF8(raw_response, headers=headers)

return wrapper

Expand Down
16 changes: 6 additions & 10 deletions models/cached_response.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import pickle
from dataclasses import dataclass
from datetime import datetime, timedelta

from msgspec import Struct

from utils import MSGPACK_ENCODE, typed_msgpack_decoder


class CachedResponse(Struct, forbid_unknown_fields=True, array_like=True):
@dataclass(kw_only=True, slots=True)
class CachedResponse:
date: datetime
max_age: timedelta
stale: timedelta
Expand All @@ -14,11 +13,8 @@ class CachedResponse(Struct, forbid_unknown_fields=True, array_like=True):
content: bytes

def to_bytes(self) -> bytes:
return MSGPACK_ENCODE(self)
return pickle.dumps(self, protocol=pickle.HIGHEST_PROTOCOL)

@classmethod
def from_bytes(cls, buffer: bytes) -> 'CachedResponse':
return _decode(buffer)


_decode = typed_msgpack_decoder(CachedResponse).decode
return pickle.loads(buffer) # noqa: S301
47 changes: 22 additions & 25 deletions openstreetmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from starlette import status

from config import CHANGESET_ID_PLACEHOLDER, DEFAULT_CHANGESET_TAGS, OPENSTREETMAP_API_URL
from utils import http_get, http_post, http_put, retry_exponential
from utils import HTTP, retry_exponential
from xmltodict_postprocessor import xmltodict_postprocessor


Expand All @@ -21,27 +21,24 @@ def __init__(self, access_token: SecretStr):
@retry_exponential(10)
@trace
async def get_authorized_user(self) -> dict | None:
async with http_get(
r = await HTTP.get(
f'{OPENSTREETMAP_API_URL}user/details.json',
allow_redirects=True,
headers={'Authorization': f'Bearer {self.access_token.get_secret_value()}'},
) as r:
if r.status == status.HTTP_401_UNAUTHORIZED:
return None
r.raise_for_status()
return (await r.json())['user']
)
if r.status_code == status.HTTP_401_UNAUTHORIZED:
return None
r.raise_for_status()
return r.json()['user']

@retry_exponential(10)
@trace
async def get_node_xml(self, node_id: int) -> dict | None:
async with http_get(f'{OPENSTREETMAP_API_URL}node/{node_id}', allow_redirects=True) as r:
if r.status in (status.HTTP_404_NOT_FOUND, status.HTTP_410_GONE):
return None
r.raise_for_status()
content = await r.read()

r = await HTTP.get(f'{OPENSTREETMAP_API_URL}node/{node_id}')
if r.status_code in (status.HTTP_404_NOT_FOUND, status.HTTP_410_GONE):
return None
r.raise_for_status()
return xmltodict.parse(
content,
r.content,
postprocessor=xmltodict_postprocessor,
force_list=('tag',),
)['osm']['node']
Expand All @@ -52,35 +49,35 @@ async def upload_osm_change(self, osm_change: str) -> str:
{'osm': {'changeset': {'tag': [{'@k': k, '@v': v} for k, v in DEFAULT_CHANGESET_TAGS.items()]}}}
)

async with http_put(
r = await HTTP.put(
f'{OPENSTREETMAP_API_URL}changeset/create',
data=changeset,
headers={
'Authorization': f'Bearer {self.access_token.get_secret_value()}',
'Content-Type': 'text/xml; charset=utf-8',
},
raise_for_status=True,
) as r:
changeset_id = await r.text()
content=changeset,
)
r.raise_for_status()
changeset_id = r.text

osm_change = osm_change.replace(CHANGESET_ID_PLACEHOLDER, changeset_id)
logging.info('Uploading changeset %s', changeset_id)
logging.info('https://www.openstreetmap.org/changeset/%s', changeset_id)

await http_post(
r = await HTTP.post(
f'{OPENSTREETMAP_API_URL}changeset/{changeset_id}/upload',
data=osm_change,
headers={
'Authorization': f'Bearer {self.access_token.get_secret_value()}',
'Content-Type': 'text/xml; charset=utf-8',
},
raise_for_status=True,
content=osm_change,
)
r.raise_for_status()

await http_put(
r = await HTTP.put(
f'{OPENSTREETMAP_API_URL}changeset/{changeset_id}/close',
headers={'Authorization': f'Bearer {self.access_token.get_secret_value()}'},
raise_for_status=True,
)
r.raise_for_status()

return changeset_id
11 changes: 6 additions & 5 deletions osm_countries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from collections.abc import Sequence
from typing import cast

Expand All @@ -8,18 +9,18 @@

from config import COUNTRY_GEOJSON_URL
from models.osm_country import OSMCountry
from utils import JSON_DECODE, http_get
from utils import HTTP

_zstd_decompress = ZstdDecompressor().decompress


@trace
async def get_osm_countries() -> Sequence[OSMCountry]:
async with http_get(COUNTRY_GEOJSON_URL, allow_redirects=True, raise_for_status=True) as r:
buffer = await r.read()
r = await HTTP.get(COUNTRY_GEOJSON_URL)
r.raise_for_status()

buffer = _zstd_decompress(buffer)
data: dict = JSON_DECODE(buffer)
buffer = _zstd_decompress(r.content)
data: dict = json.loads(buffer)
result = []

for feature in data['features']:
Expand Down
14 changes: 4 additions & 10 deletions overpass.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from collections.abc import Sequence
from datetime import UTC, datetime

from aiohttp import ClientTimeout
from sentry_sdk import trace

from config import OVERPASS_API_URL
from utils import http_post, retry_exponential
from utils import HTTP, retry_exponential


@retry_exponential(None)
Expand All @@ -14,14 +13,9 @@ async def query_overpass(query: str, *, timeout: int, must_return: bool = False)
join = '' if query.startswith('[') else ';'
query = f'[out:json][timeout:{timeout}]{join}{query}'

async with http_post(
OVERPASS_API_URL,
data={'data': query},
timeout=ClientTimeout(total=timeout * 2),
allow_redirects=True,
raise_for_status=True,
) as r:
data = await r.json()
r = await HTTP.post(OVERPASS_API_URL, data={'data': query}, timeout=timeout * 2)
r.raise_for_status()
data = r.json()

data_timestamp = (
datetime.strptime(
Expand Down
Loading

0 comments on commit 10af27c

Please sign in to comment.