Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/2024-asyncio-dive'
Browse files Browse the repository at this point in the history
  • Loading branch information
Arjan Egges committed May 1, 2024
2 parents d23fd64 + ccf8bea commit d31e297
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 0 deletions.
202 changes: 202 additions & 0 deletions 2024/asyncio_dive/async_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import asyncio
import json
import logging
from typing import Any, Awaitable, Callable, Iterable, Optional

import aiosqlite

logging.basicConfig(level=logging.INFO)


class Router:
def __init__(self):
self.routes: dict[str, Callable[..., Awaitable[Any]]] = {}

def route(
self, path: str, method: str = "GET"
) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]:
def decorator(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
self.routes[f"{method} {path}"] = func
return func

return decorator

async def handle(
self, method: str, path: str, data: Optional[dict[str, Any]] = None
) -> str:
logging.info(f"Handling {method} {path} {data}")
handler = self.routes.get(f"{method} {path}")
if handler:
if data is not None:
data = await handler(data)
else:
data = await handler()
response_body = json.dumps(data)
response_header = "HTTP/1.1 200 OK\nContent-Type: application/json\n\n"
return response_header + response_body
else:
return (
"HTTP/1.1 404 Not Found\nContent-Type: application/json\n\n"
+ json.dumps({"error": "Not Found"})
)


class AsyncAPIServer:
def __init__(
self, host: str = "127.0.0.1", port: int = 3000, router: Optional[Router] = None
):
self.host = host
self.port = port
self.router = router if router is not None else Router()

async def start(self) -> None:
server = await asyncio.start_server(
self.accept_connections, self.host, self.port
)
addr = server.sockets[0].getsockname()
logging.info(f"Serving on http://{addr[0]}:{addr[1]}")

async with server:
await server.serve_forever()

async def accept_connections(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
addr = writer.get_extra_info("peername")
logging.info(f"Connected by {addr}")
request_handler = AsyncAPIRequestHandler(reader, writer, self.router)
await request_handler.process_request()

def route(
self, path: str, method: str = "GET"
) -> Callable[..., Callable[..., Awaitable[Any]]]:
return self.router.route(path, method)


class AsyncAPIRequestHandler:
def __init__(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, router: Router
):
self.reader = reader
self.writer = writer
self.router = router

async def process_request(self) -> None:
request_line = await self.reader.readline()
request_line = request_line.decode("utf-8").strip()
logging.info(f"Request: {request_line}")
method, path, _ = request_line.split(" ")

if method in ["POST", "PUT", "DELETE"]:
content_length = await self.read_headers()
if content_length:
body = await self.reader.readexactly(content_length)
data = json.loads(body.decode("utf-8"))
response = await self.router.handle(method, path, data=data)
else:
response = (
"HTTP/1.1 400 Bad Request\nContent-Type: application/json\n\n"
+ json.dumps({"error": "Bad Request"})
)
else:
response = await self.router.handle(method, path)

self.writer.write(response.encode())
await self.writer.drain()
self.writer.close()

async def read_headers(self) -> Optional[int]:
content_length = None
while True:
line = await self.reader.readline()
if line == b"\r\n": # End of headers
break
header = line.decode("utf-8").strip()
if header.lower().startswith("content-length"):
content_length = int(header.split(":")[1].strip())
return content_length


api = AsyncAPIServer()


@api.route("/books")
async def get_books() -> dict[str, Iterable[aiosqlite.Row]]:
async with aiosqlite.connect("book.db") as conn:
async with conn.execute("SELECT * FROM books") as cursor:
books = await cursor.fetchall()
return {"books": books}


@api.route("/movies")
async def get_movies() -> dict[str, Iterable[aiosqlite.Row]]:
async with aiosqlite.connect("movie.db") as conn:
async with conn.execute("SELECT * FROM movies") as cursor:
movies = await cursor.fetchall()
return {"movies": movies}


@api.route("/books", "POST")
async def add_book(data: dict[str, Any]) -> dict[str, str]:
async with aiosqlite.connect("book.db") as conn:
await conn.execute(
"INSERT INTO books (title, author) VALUES (?, ?)",
(data["title"], data["author"]),
)
await conn.commit()
return {"message": "Book added"}


@api.route("/movies", "POST")
async def add_movie(data: dict[str, Any]) -> dict[str, str]:
async with aiosqlite.connect("movie.db") as conn:
await conn.execute(
"INSERT INTO movies (title, director) VALUES (?, ?)",
(data["title"], data["director"]),
)
await conn.commit()
return {"message": "Movie added"}


@api.route("/books", "DELETE")
async def delete_book(data: dict[str, Any]) -> dict[str, str]:
async with aiosqlite.connect("book.db") as conn:
await conn.execute("DELETE FROM books WHERE title = ?", (data["title"],))
await conn.commit()
return {"message": "Book deleted"}


@api.route("/movies", "DELETE")
async def delete_movie(data: dict[str, Any]) -> dict[str, str]:
async with aiosqlite.connect("movie.db") as conn:
await conn.execute("DELETE FROM movies WHERE title = ?", (data["title"],))
await conn.commit()
return {"message": "Movie deleted"}


async def create_table(statement: str, db_path: str) -> None:
async with aiosqlite.connect(db_path) as db:
await db.execute(statement)
await db.commit()


async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(
create_table(
"CREATE TABLE IF NOT EXISTS books (title TEXT, author TEXT)", "book.db"
)
)
tasks.create_task(
create_table(
"CREATE TABLE IF NOT EXISTS movies (title TEXT, director TEXT)",
"movie.db",
)
)
await api.start()


if __name__ == "__main__":
asyncio.run(main())
61 changes: 61 additions & 0 deletions 2024/asyncio_dive/async_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import json
import logging

import aiohttp

logging.basicConfig(level=logging.INFO)


async def fetch_books(session: aiohttp.ClientSession) -> str:
async with session.get("http://127.0.0.1:3000/books") as response:
return await response.text()


async def add_book(session: aiohttp.ClientSession, title: str, author: str) -> str:
async with session.post(
"http://127.0.0.1:3000/books",
data=json.dumps({"title": title, "author": author}),
) as response:
return await response.text()


async def delete_book(session: aiohttp.ClientSession, title: str) -> str:
async with session.delete(
"http://127.0.0.1:3000/books", data=json.dumps({"title": title})
) as response:
return await response.text()


async def add_movie(session: aiohttp.ClientSession, title: str, director: str) -> str:
async with session.post(
"http://127.0.0.1:3000/movies",
data=json.dumps({"title": title, "director": director}),
) as response:
return await response.text()


async def delete_movie(session: aiohttp.ClientSession, title: str) -> str:
async with session.delete(
"http://127.0.0.1:3000/movies", data=json.dumps({"title": title})
) as response:
return await response.text()


async def main() -> None:
async with aiohttp.ClientSession() as session:
batch = [
fetch_books(session),
add_book(session, "The Catcher in the Rye", "J.D. Salinger"),
add_book(session, "1984", "George Orwell"),
add_movie(session, "The Godfather", "Francis Ford Coppola"),
delete_book(session, "1984"),
delete_movie(session, "The Godfather"),
]
results = await asyncio.gather(*batch)
for result in results:
logging.info(result)


if __name__ == "__main__":
asyncio.run(main())
77 changes: 77 additions & 0 deletions 2024/asyncio_dive/async_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
import logging

import aiofiles


class AsyncServer:
def __init__(self, host: str = "127.0.0.1", port: int = 3000):
self.host = host
self.port = port

async def start(self) -> None:
server = await asyncio.start_server(
self.accept_connections, self.host, self.port
)
addr = server.sockets[0].getsockname()
logging.info(f"Server started at http://{addr[0]}:{addr[1]}")

async with server:
await server.serve_forever()

async def accept_connections(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
addr = writer.get_extra_info("peername")
logging.info(f"Connected by {addr}")
request_handler = AsyncRequestHandler(reader, writer)
await request_handler.process_request()


class AsyncRequestHandler:
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer

async def process_request(self) -> None:
request = await self.reader.read(1024)
request = request.decode("utf-8")
logging.info(f"Request: {request}")
await self.handle_request(request)

async def handle_request(self, request: str) -> None:
path = self.get_path(request)
response = await self.generate_response(path)
self.writer.write(response.encode())
await self.writer.drain()
self.writer.close()

def get_path(self, request: str) -> str:
try:
path = request.split(" ")[1]
if path == "/":
return "index.html"
return path
except IndexError:
return "index.html"

async def generate_response(self, path: str) -> str:
await asyncio.sleep(2)
try:
async with aiofiles.open(path, mode="r") as f:
response_body = await f.read()
response_header = "HTTP/1.1 200 OK\nContent-Type: text/html\n\n"
except FileNotFoundError:
response_body = "<html><body><h1>404 Not Found</h1></body></html>"
response_header = "HTTP/1.1 404 Not Found\n\n"
return response_header + response_body


async def main() -> None:
logging.basicConfig(level=logging.INFO)
server = AsyncServer()
await server.start()


if __name__ == "__main__":
asyncio.run(main())
22 changes: 22 additions & 0 deletions 2024/asyncio_dive/asyncio_explicit_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio


async def do_io():
print("io start")
await asyncio.sleep(2)
print("io end")


async def do_other_things():
print("doing other things")


def main() -> None:
loop = asyncio.get_event_loop()
loop.run_until_complete(do_io())
loop.run_until_complete(do_other_things())
loop.close()


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions 2024/asyncio_dive/index.html

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions 2024/asyncio_dive/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.poetry]
name = "async-deep-dive"
version = "0.1.0"
description = ""
authors = ["ArjanCodes"]

[tool.poetry.dependencies]
python = "^3.12"
aiofiles = "^23.2.1"
aiosqlite = "^0.17.0"
aiohttp = "^3.8.1"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Loading

0 comments on commit d31e297

Please sign in to comment.