-
Notifications
You must be signed in to change notification settings - Fork 38
/
app.py
54 lines (40 loc) · 1.9 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from fastapi import FastAPI
from container import Container
from core.fastapi.error import init_error_handler
from core.fastapi.event.middleware import EventHandlerMiddleware
from core.fastapi.responses import ORJSONResponse
from core.fastapi.routes import add_routes
from sqlalchemy.orm import clear_mappers
from modules.author.usecase.addBookToAuthor import event_handler as book_domain_event_impl
from modules.author.infrastructure.persistence import mapper as author_persistence_mapper
from modules.author.infrastructure.query import mapper as author_query_mapper
from modules.author.usecase import router as author_router
from modules.author.usecase.newAuthor import api as new_author_api
from modules.book.infrastructure.persistence import mapper as book_persistence_mapper
from modules.book.infrastructure.query import mapper as book_query_mapper
from modules.book.usecase import router as book_router
from modules.book.usecase.newBook import api as new_book_api
from modules.book.usecase.addAuthor import api as add_author_api
from modules.book.usecase.deleteBook import api as delete_book_api
from modules.book.usecase.findBookByTitle import api as find_book_api
app = FastAPI(default_response_class=ORJSONResponse)
add_routes([author_router, book_router], app)
# Insert Container (IoC)
container = Container()
container.wire(modules=[new_author_api, new_book_api, add_author_api, delete_book_api, find_book_api])
app.container = container
db = container.db()
app.add_middleware(EventHandlerMiddleware)
init_error_handler(app, '[email protected]')
@app.on_event("startup")
async def on_startup():
await db.connect(echo=True)
await db.create_database()
author_persistence_mapper.start_mapper()
author_query_mapper.start_mapper()
book_persistence_mapper.start_mapper()
book_query_mapper.start_mapper()
@app.on_event("shutdown")
async def on_shutdown():
clear_mappers()
await db.disconnect()