-
Notifications
You must be signed in to change notification settings - Fork 0
/
overpass.py
32 lines (24 loc) · 924 Bytes
/
overpass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections.abc import Sequence
from datetime import UTC, datetime
from sentry_sdk import trace
from config import OVERPASS_API_URL
from utils import HTTP, retry_exponential
@retry_exponential(None)
@trace
async def query_overpass(query: str, *, timeout: int, must_return: bool = False) -> tuple[Sequence[dict], float]: # noqa: ASYNC109
join = '' if query.startswith('[') else ';'
query = f'[out:json][timeout:{timeout}]{join}{query}'
r = await HTTP.post(OVERPASS_API_URL, data={'data': query}, timeout=timeout * 2)
r.raise_for_status()
data = r.json()
data_timestamp = (
datetime.strptime(
data['osm3s']['timestamp_osm_base'],
'%Y-%m-%dT%H:%M:%SZ',
)
.replace(tzinfo=UTC)
.timestamp()
)
if must_return and not data['elements']:
raise ValueError('No elements returned')
return data['elements'], data_timestamp