diff --git a/2024/asyncio_dive/async_api.py b/2024/asyncio_dive/async_api.py new file mode 100644 index 0000000..d787250 --- /dev/null +++ b/2024/asyncio_dive/async_api.py @@ -0,0 +1,202 @@ +import asyncio +import json +import logging +from typing import Any, Awaitable, Callable, Iterable, Optional + +import aiosqlite + +logging.basicConfig(level=logging.INFO) + + +class Router: + def __init__(self): + self.routes: dict[str, Callable[..., Awaitable[Any]]] = {} + + def route( + self, path: str, method: str = "GET" + ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: + def decorator( + func: Callable[..., Awaitable[Any]], + ) -> Callable[..., Awaitable[Any]]: + self.routes[f"{method} {path}"] = func + return func + + return decorator + + async def handle( + self, method: str, path: str, data: Optional[dict[str, Any]] = None + ) -> str: + logging.info(f"Handling {method} {path} {data}") + handler = self.routes.get(f"{method} {path}") + if handler: + if data is not None: + data = await handler(data) + else: + data = await handler() + response_body = json.dumps(data) + response_header = "HTTP/1.1 200 OK\nContent-Type: application/json\n\n" + return response_header + response_body + else: + return ( + "HTTP/1.1 404 Not Found\nContent-Type: application/json\n\n" + + json.dumps({"error": "Not Found"}) + ) + + +class AsyncAPIServer: + def __init__( + self, host: str = "127.0.0.1", port: int = 3000, router: Optional[Router] = None + ): + self.host = host + self.port = port + self.router = router if router is not None else Router() + + async def start(self) -> None: + server = await asyncio.start_server( + self.accept_connections, self.host, self.port + ) + addr = server.sockets[0].getsockname() + logging.info(f"Serving on http://{addr[0]}:{addr[1]}") + + async with server: + await server.serve_forever() + + async def accept_connections( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + addr = writer.get_extra_info("peername") + logging.info(f"Connected by {addr}") + request_handler = AsyncAPIRequestHandler(reader, writer, self.router) + await request_handler.process_request() + + def route( + self, path: str, method: str = "GET" + ) -> Callable[..., Callable[..., Awaitable[Any]]]: + return self.router.route(path, method) + + +class AsyncAPIRequestHandler: + def __init__( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, router: Router + ): + self.reader = reader + self.writer = writer + self.router = router + + async def process_request(self) -> None: + request_line = await self.reader.readline() + request_line = request_line.decode("utf-8").strip() + logging.info(f"Request: {request_line}") + method, path, _ = request_line.split(" ") + + if method in ["POST", "PUT", "DELETE"]: + content_length = await self.read_headers() + if content_length: + body = await self.reader.readexactly(content_length) + data = json.loads(body.decode("utf-8")) + response = await self.router.handle(method, path, data=data) + else: + response = ( + "HTTP/1.1 400 Bad Request\nContent-Type: application/json\n\n" + + json.dumps({"error": "Bad Request"}) + ) + else: + response = await self.router.handle(method, path) + + self.writer.write(response.encode()) + await self.writer.drain() + self.writer.close() + + async def read_headers(self) -> Optional[int]: + content_length = None + while True: + line = await self.reader.readline() + if line == b"\r\n": # End of headers + break + header = line.decode("utf-8").strip() + if header.lower().startswith("content-length"): + content_length = int(header.split(":")[1].strip()) + return content_length + + +api = AsyncAPIServer() + + +@api.route("/books") +async def get_books() -> dict[str, Iterable[aiosqlite.Row]]: + async with aiosqlite.connect("book.db") as conn: + async with conn.execute("SELECT * FROM books") as cursor: + books = await cursor.fetchall() + return {"books": books} + + +@api.route("/movies") +async def get_movies() -> dict[str, Iterable[aiosqlite.Row]]: + async with aiosqlite.connect("movie.db") as conn: + async with conn.execute("SELECT * FROM movies") as cursor: + movies = await cursor.fetchall() + return {"movies": movies} + + +@api.route("/books", "POST") +async def add_book(data: dict[str, Any]) -> dict[str, str]: + async with aiosqlite.connect("book.db") as conn: + await conn.execute( + "INSERT INTO books (title, author) VALUES (?, ?)", + (data["title"], data["author"]), + ) + await conn.commit() + return {"message": "Book added"} + + +@api.route("/movies", "POST") +async def add_movie(data: dict[str, Any]) -> dict[str, str]: + async with aiosqlite.connect("movie.db") as conn: + await conn.execute( + "INSERT INTO movies (title, director) VALUES (?, ?)", + (data["title"], data["director"]), + ) + await conn.commit() + return {"message": "Movie added"} + + +@api.route("/books", "DELETE") +async def delete_book(data: dict[str, Any]) -> dict[str, str]: + async with aiosqlite.connect("book.db") as conn: + await conn.execute("DELETE FROM books WHERE title = ?", (data["title"],)) + await conn.commit() + return {"message": "Book deleted"} + + +@api.route("/movies", "DELETE") +async def delete_movie(data: dict[str, Any]) -> dict[str, str]: + async with aiosqlite.connect("movie.db") as conn: + await conn.execute("DELETE FROM movies WHERE title = ?", (data["title"],)) + await conn.commit() + return {"message": "Movie deleted"} + + +async def create_table(statement: str, db_path: str) -> None: + async with aiosqlite.connect(db_path) as db: + await db.execute(statement) + await db.commit() + + +async def main() -> None: + async with asyncio.TaskGroup() as tasks: + tasks.create_task( + create_table( + "CREATE TABLE IF NOT EXISTS books (title TEXT, author TEXT)", "book.db" + ) + ) + tasks.create_task( + create_table( + "CREATE TABLE IF NOT EXISTS movies (title TEXT, director TEXT)", + "movie.db", + ) + ) + await api.start() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/2024/asyncio_dive/async_api_client.py b/2024/asyncio_dive/async_api_client.py new file mode 100644 index 0000000..90c2371 --- /dev/null +++ b/2024/asyncio_dive/async_api_client.py @@ -0,0 +1,61 @@ +import asyncio +import json +import logging + +import aiohttp + +logging.basicConfig(level=logging.INFO) + + +async def fetch_books(session: aiohttp.ClientSession) -> str: + async with session.get("http://127.0.0.1:3000/books") as response: + return await response.text() + + +async def add_book(session: aiohttp.ClientSession, title: str, author: str) -> str: + async with session.post( + "http://127.0.0.1:3000/books", + data=json.dumps({"title": title, "author": author}), + ) as response: + return await response.text() + + +async def delete_book(session: aiohttp.ClientSession, title: str) -> str: + async with session.delete( + "http://127.0.0.1:3000/books", data=json.dumps({"title": title}) + ) as response: + return await response.text() + + +async def add_movie(session: aiohttp.ClientSession, title: str, director: str) -> str: + async with session.post( + "http://127.0.0.1:3000/movies", + data=json.dumps({"title": title, "director": director}), + ) as response: + return await response.text() + + +async def delete_movie(session: aiohttp.ClientSession, title: str) -> str: + async with session.delete( + "http://127.0.0.1:3000/movies", data=json.dumps({"title": title}) + ) as response: + return await response.text() + + +async def main() -> None: + async with aiohttp.ClientSession() as session: + batch = [ + fetch_books(session), + add_book(session, "The Catcher in the Rye", "J.D. Salinger"), + add_book(session, "1984", "George Orwell"), + add_movie(session, "The Godfather", "Francis Ford Coppola"), + delete_book(session, "1984"), + delete_movie(session, "The Godfather"), + ] + results = await asyncio.gather(*batch) + for result in results: + logging.info(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/2024/asyncio_dive/async_server.py b/2024/asyncio_dive/async_server.py new file mode 100644 index 0000000..550d42a --- /dev/null +++ b/2024/asyncio_dive/async_server.py @@ -0,0 +1,77 @@ +import asyncio +import logging + +import aiofiles + + +class AsyncServer: + def __init__(self, host: str = "127.0.0.1", port: int = 3000): + self.host = host + self.port = port + + async def start(self) -> None: + server = await asyncio.start_server( + self.accept_connections, self.host, self.port + ) + addr = server.sockets[0].getsockname() + logging.info(f"Server started at http://{addr[0]}:{addr[1]}") + + async with server: + await server.serve_forever() + + async def accept_connections( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + addr = writer.get_extra_info("peername") + logging.info(f"Connected by {addr}") + request_handler = AsyncRequestHandler(reader, writer) + await request_handler.process_request() + + +class AsyncRequestHandler: + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer + + async def process_request(self) -> None: + request = await self.reader.read(1024) + request = request.decode("utf-8") + logging.info(f"Request: {request}") + await self.handle_request(request) + + async def handle_request(self, request: str) -> None: + path = self.get_path(request) + response = await self.generate_response(path) + self.writer.write(response.encode()) + await self.writer.drain() + self.writer.close() + + def get_path(self, request: str) -> str: + try: + path = request.split(" ")[1] + if path == "/": + return "index.html" + return path + except IndexError: + return "index.html" + + async def generate_response(self, path: str) -> str: + await asyncio.sleep(2) + try: + async with aiofiles.open(path, mode="r") as f: + response_body = await f.read() + response_header = "HTTP/1.1 200 OK\nContent-Type: text/html\n\n" + except FileNotFoundError: + response_body = "