From daf0ae508d07b6b350bd7e70d5116a433b8697af Mon Sep 17 00:00:00 2001 From: MissArticulatePython <39281408+AlyceOsbourne@users.noreply.github.com> Date: Mon, 18 Mar 2024 23:24:03 +0000 Subject: [PATCH 1/4] Add files via upload --- 2024/unit_of_work/pyproject.toml | 15 ++++ 2024/unit_of_work/unit_of_work.py | 126 ++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 2024/unit_of_work/pyproject.toml create mode 100644 2024/unit_of_work/unit_of_work.py diff --git a/2024/unit_of_work/pyproject.toml b/2024/unit_of_work/pyproject.toml new file mode 100644 index 00000000..66919d8c --- /dev/null +++ b/2024/unit_of_work/pyproject.toml @@ -0,0 +1,15 @@ +[tool.poetry] +name = "unit-of-work" +version = "0.1.0" +description = "" +authors = ["Your Name "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.10" +SQLAlchemy = "^2.0.28" + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/2024/unit_of_work/unit_of_work.py b/2024/unit_of_work/unit_of_work.py new file mode 100644 index 00000000..dbd51db0 --- /dev/null +++ b/2024/unit_of_work/unit_of_work.py @@ -0,0 +1,126 @@ +from sqlalchemy import create_engine, Column, Integer, String, ForeignKey +from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, Mapped +from typing import Optional +import contextlib + +DATABASE_URL = "sqlite:///example.db" +engine = create_engine(DATABASE_URL) +Session = scoped_session(sessionmaker(bind = engine)) +Base = declarative_base() + + +class User(Base): + __tablename__ = 'users' + id: Mapped[int] = Column(Integer, primary_key = True) + name: Mapped[str] = Column(String, unique = True, index = True) + user_detail: Mapped['UserDetail'] = relationship('UserDetail', back_populates = 'user', uselist = False, + cascade = "all, delete-orphan") + user_preference: Mapped['UserPreference'] = relationship('UserPreference', back_populates = 'user', uselist = False, + cascade = "all, delete-orphan") + + def __repr__(self) -> str: + return f"User(id={self.id!r}, name={self.name!r})" + + +class UserDetail(Base): + __tablename__ = 'user_details' + id: Mapped[int] = Column(Integer, primary_key = True) + user_id: Mapped[int] = Column(Integer, ForeignKey('users.id'), unique = True) + details: Mapped[str] = Column(String) + user: Mapped['User'] = relationship('User', back_populates = 'user_detail') + + def __repr__(self) -> str: + return f"UserDetail(id={self.id!r}, details={self.details!r})" + + +class UserPreference(Base): + __tablename__ = 'user_preferences' + id: Mapped[int] = Column(Integer, primary_key = True) + user_id: Mapped[int] = Column(Integer, ForeignKey('users.id'), unique = True) + preference: Mapped[str] = Column(String) + user: Mapped['User'] = relationship('User', back_populates = 'user_preference') + + def __repr__(self) -> str: + return f"UserPreference(id={self.id!r}, preference={self.preference!r})" + + +def create_user(session: Session, name: str, details: Optional[str] = None, preferences: Optional[str] = None) -> User: + user = User(name = name) + if details: + user.user_detail = UserDetail(details = details) + if preferences: + user.user_preference = UserPreference(preference = preferences) + session.add(user) + return user + + +def update_user(session: Session, user_id: int, name: Optional[str] = None, details: Optional[str] = None, + preferences: Optional[str] = None) -> Optional[User]: + user: Optional[User] = session.get(User, user_id) + if not user: + return None + if name: + user.name = name + if details: + if user.user_detail: + user.user_detail.details = details + else: + user.user_detail = UserDetail(details = details) + if preferences: + if user.user_preference: + user.user_preference.preference = preferences + else: + user.user_preference = UserPreference(preference = preferences) + return user + + +def delete_user(session: Session, user_id: int) -> None: + user: Optional[User] = session.get(User, user_id) + if user: + session.delete(user) + + +def get_user(session: Session, user_id: int) -> Optional[User]: + user: Optional[User] = session.get(User, user_id) + return user + + +def get_users(session: Session) -> list[User]: + users: list[User] = session.query(User).all() + return users + + +Base.metadata.create_all(engine) + + +@contextlib.contextmanager +def unit(): + session = Session() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def main(): + with unit() as session: + user = create_user(session, "Arjan", "details", "preferences") + print(user, user.user_detail, user.user_preference) + session.flush() + user = update_user(session, user.id, "Arjan Updated", "more details", "updated preferences") + print(user, user.user_detail, user.user_preference) + user = get_user(session, user.id) + print(user) + users = get_users(session) + print(users) + delete_user(session, user.id) + users = get_users(session) + print(users) + + +if __name__ == "__main__": + main() From e29c4f88e75a92c3d11372bf13e7f7d33df40201 Mon Sep 17 00:00:00 2001 From: MissArticulatePython <39281408+AlyceOsbourne@users.noreply.github.com> Date: Wed, 20 Mar 2024 12:32:00 +0000 Subject: [PATCH 2/4] Update and rename unit_of_work.py to main.py --- 2024/unit_of_work/{unit_of_work.py => main.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename 2024/unit_of_work/{unit_of_work.py => main.py} (100%) diff --git a/2024/unit_of_work/unit_of_work.py b/2024/unit_of_work/main.py similarity index 100% rename from 2024/unit_of_work/unit_of_work.py rename to 2024/unit_of_work/main.py From 6949ea53b944f9379735cf93ff28181b20cae9ba Mon Sep 17 00:00:00 2001 From: MissArticulatePython <39281408+AlyceOsbourne@users.noreply.github.com> Date: Mon, 1 Apr 2024 19:27:37 +0100 Subject: [PATCH 3/4] Add files via upload --- 2024/unit_of_work/unit_of_work.py | 130 +++++++++++++++++++++ 2024/unit_of_work/unit_of_work_skeleton.py | 95 +++++++++++++++ 2 files changed, 225 insertions(+) create mode 100644 2024/unit_of_work/unit_of_work.py create mode 100644 2024/unit_of_work/unit_of_work_skeleton.py diff --git a/2024/unit_of_work/unit_of_work.py b/2024/unit_of_work/unit_of_work.py new file mode 100644 index 00000000..754b5c30 --- /dev/null +++ b/2024/unit_of_work/unit_of_work.py @@ -0,0 +1,130 @@ +from sqlalchemy import create_engine, Integer, String, ForeignKey +from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, Mapped, mapped_column, Session +from typing import Optional +import contextlib +import logging + +logging.basicConfig(level = logging.INFO) + +DATABASE_URL = "sqlite:///example.db" +engine = create_engine(DATABASE_URL) +SessionMaker = scoped_session(sessionmaker(bind = engine)) +Base = declarative_base() + + +class User(Base): + __tablename__ = 'users' + id: Mapped[int] = mapped_column(Integer, primary_key = True) + name: Mapped[str] = mapped_column(String) + user_detail: Mapped['UserDetail'] = relationship('UserDetail', back_populates = 'user', uselist = False, + cascade = "all, delete-orphan") + user_preference: Mapped['UserPreference'] = relationship('UserPreference', back_populates = 'user', uselist = False, + cascade = "all, delete-orphan") + + def __repr__(self) -> str: + return f"User(id={self.id!r}, name={self.name!r})" + + +class UserDetail(Base): + __tablename__ = 'user_details' + id: Mapped[int] = mapped_column(Integer, primary_key = True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), unique = True) + details: Mapped[str] = mapped_column(String) + user: Mapped['User'] = relationship('User', back_populates = 'user_detail') + + def __repr__(self) -> str: + return f"UserDetail(id={self.id!r}, details={self.details!r})" + + +class UserPreference(Base): + __tablename__ = 'user_preferences' + id: Mapped[int] = mapped_column(Integer, primary_key = True) + user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), unique = True) + preference: Mapped[str] = mapped_column(String) + user: Mapped['User'] = relationship('User', back_populates = 'user_preference') + + def __repr__(self) -> str: + return f"UserPreference(id={self.id!r}, preference={self.preference!r})" + + +def create_user(session: Session, name: str, details: Optional[str] = None, preferences: Optional[str] = None) -> User: + user = User(name = name) + if details: + user.user_detail = UserDetail(details = details) + if preferences: + user.user_preference = UserPreference(preference = preferences) + session.add(user) + return user + + +def update_user(session: Session, user_id: int, name: Optional[str] = None, details: Optional[str] = None, + preferences: Optional[str] = None) -> Optional[User]: + user: Optional[User] = session.get(User, user_id) + if not user: + return None + if name: + user.name = name + if details: + user.user_detail.details = details + if preferences: + user.user_preference.preference = preferences + return user + + +def delete_user(session: Session, user_id: int) -> None: + user: Optional[User] = session.get(User, user_id) + if user: + session.delete(user) + + +def get_user(session: Session, user_id: int) -> Optional[User]: + user: Optional[User] = session.get(User, user_id) + return user + + +def get_users(session: Session) -> list[User]: + users: list[User] = session.query(User).all() + return users + + +Base.metadata.create_all(engine) + + +@contextlib.contextmanager +def unit(): + session = SessionMaker() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def main() -> None: + with unit() as session: + user = create_user(session, "Arjan", "details", "preferences") + print(user, user.user_detail, user.user_preference) + session.flush() + user: User = update_user(session, user.id, "Arjan Updated", "more details", "updated preferences") + + if user is None: + return + + print(user, user.user_detail, user.user_preference) + user = get_user(session, user.id) + if user is None: + return + + print(user) + users = get_users(session) + print(users) + delete_user(session, user.id) + users = get_users(session) + print(users) + + +if __name__ == "__main__": + main() diff --git a/2024/unit_of_work/unit_of_work_skeleton.py b/2024/unit_of_work/unit_of_work_skeleton.py new file mode 100644 index 00000000..9616887c --- /dev/null +++ b/2024/unit_of_work/unit_of_work_skeleton.py @@ -0,0 +1,95 @@ +import sqlite3 +import logging +from typing import List, Dict, Any, Optional + +logging.basicConfig(level=logging.INFO) + +class DBConnectionHandler: + def __init__(self, db_name: str): + self.db_name: str = db_name + self.connection: Optional[sqlite3.Connection] = None + + def __enter__(self) -> sqlite3.Connection: + self.connection = sqlite3.connect(self.db_name) + self.connection.row_factory = sqlite3.Row + return self.connection + + def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: + assert self.connection is not None + self.connection.close() + +def create_tables() -> None: + with DBConnectionHandler('example.db') as connection: + cursor: sqlite3.Cursor = connection.cursor() + cursor.execute(''' + CREATE TABLE IF NOT EXISTS items ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + quantity INTEGER NOT NULL + ) + ''') + connection.commit() + +def drop_tables() -> None: + with DBConnectionHandler('example.db') as connection: + cursor: sqlite3.Cursor = connection.cursor() + cursor.execute('DROP TABLE IF EXISTS items') + connection.commit() + +class Repository: + def __init__(self, connection: sqlite3.Connection): + self.connection: sqlite3.Connection = connection + + def add(self, name: str, quantity: int) -> None: + cursor: sqlite3.Cursor = self.connection.cursor() + cursor.execute('INSERT INTO items (name, quantity) VALUES (?, ?)', (name, quantity)) + self.connection.commit() + + def all(self) -> List[Dict[str, Any]]: + cursor: sqlite3.Cursor = self.connection.cursor() + cursor.execute('SELECT * FROM items') + return [dict(row) for row in cursor.fetchall()] + +class UnitOfWork: + def __init__(self, db_name: str = 'example.db'): + self.db_name: str = db_name + self.connection: Optional[sqlite3.Connection] = None + self.repository: Optional[Repository] = None + + def __enter__(self) -> 'UnitOfWork': + self.connection = sqlite3.connect(self.db_name) + self.connection.execute('BEGIN') + self.connection.row_factory = sqlite3.Row + self.repository = Repository(self.connection) + return self + + def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: + assert self.connection is not None + if exc_val: + self.connection.rollback() + else: + self.connection.commit() + self.connection.close() + +def main() -> None: + create_tables() + + try: + with UnitOfWork() as uow: + assert uow.repository is not None + uow.repository.add('Apple', 10) + uow.repository.add('Banana', 20) + # raise Exception("Something went wrong") + except Exception as e: + logging.error(f"Error during database operation: {e}") + + with DBConnectionHandler('example.db') as connection: + repo: Repository = Repository(connection) + logging.info("Items in the database:") + for item in repo.all(): + logging.info(item) + + drop_tables() + +if __name__ == '__main__': + main() From 4a9c1fad5196e2b87bf73632efe4201f629eea25 Mon Sep 17 00:00:00 2001 From: Arjan Egges Date: Tue, 9 Apr 2024 12:37:39 +0200 Subject: [PATCH 4/4] Updated unit of work example --- 2024/unit_of_work/main.py | 126 --------------- 2024/unit_of_work/unit_of_work.py | 145 +++++++++++------- 2024/unit_of_work/unit_of_work_basic.py | 73 +++++++++ ...skeleton.py => unit_of_work_repository.py} | 57 ++++--- 4 files changed, 199 insertions(+), 202 deletions(-) delete mode 100644 2024/unit_of_work/main.py create mode 100644 2024/unit_of_work/unit_of_work_basic.py rename 2024/unit_of_work/{unit_of_work_skeleton.py => unit_of_work_repository.py} (66%) diff --git a/2024/unit_of_work/main.py b/2024/unit_of_work/main.py deleted file mode 100644 index dbd51db0..00000000 --- a/2024/unit_of_work/main.py +++ /dev/null @@ -1,126 +0,0 @@ -from sqlalchemy import create_engine, Column, Integer, String, ForeignKey -from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, Mapped -from typing import Optional -import contextlib - -DATABASE_URL = "sqlite:///example.db" -engine = create_engine(DATABASE_URL) -Session = scoped_session(sessionmaker(bind = engine)) -Base = declarative_base() - - -class User(Base): - __tablename__ = 'users' - id: Mapped[int] = Column(Integer, primary_key = True) - name: Mapped[str] = Column(String, unique = True, index = True) - user_detail: Mapped['UserDetail'] = relationship('UserDetail', back_populates = 'user', uselist = False, - cascade = "all, delete-orphan") - user_preference: Mapped['UserPreference'] = relationship('UserPreference', back_populates = 'user', uselist = False, - cascade = "all, delete-orphan") - - def __repr__(self) -> str: - return f"User(id={self.id!r}, name={self.name!r})" - - -class UserDetail(Base): - __tablename__ = 'user_details' - id: Mapped[int] = Column(Integer, primary_key = True) - user_id: Mapped[int] = Column(Integer, ForeignKey('users.id'), unique = True) - details: Mapped[str] = Column(String) - user: Mapped['User'] = relationship('User', back_populates = 'user_detail') - - def __repr__(self) -> str: - return f"UserDetail(id={self.id!r}, details={self.details!r})" - - -class UserPreference(Base): - __tablename__ = 'user_preferences' - id: Mapped[int] = Column(Integer, primary_key = True) - user_id: Mapped[int] = Column(Integer, ForeignKey('users.id'), unique = True) - preference: Mapped[str] = Column(String) - user: Mapped['User'] = relationship('User', back_populates = 'user_preference') - - def __repr__(self) -> str: - return f"UserPreference(id={self.id!r}, preference={self.preference!r})" - - -def create_user(session: Session, name: str, details: Optional[str] = None, preferences: Optional[str] = None) -> User: - user = User(name = name) - if details: - user.user_detail = UserDetail(details = details) - if preferences: - user.user_preference = UserPreference(preference = preferences) - session.add(user) - return user - - -def update_user(session: Session, user_id: int, name: Optional[str] = None, details: Optional[str] = None, - preferences: Optional[str] = None) -> Optional[User]: - user: Optional[User] = session.get(User, user_id) - if not user: - return None - if name: - user.name = name - if details: - if user.user_detail: - user.user_detail.details = details - else: - user.user_detail = UserDetail(details = details) - if preferences: - if user.user_preference: - user.user_preference.preference = preferences - else: - user.user_preference = UserPreference(preference = preferences) - return user - - -def delete_user(session: Session, user_id: int) -> None: - user: Optional[User] = session.get(User, user_id) - if user: - session.delete(user) - - -def get_user(session: Session, user_id: int) -> Optional[User]: - user: Optional[User] = session.get(User, user_id) - return user - - -def get_users(session: Session) -> list[User]: - users: list[User] = session.query(User).all() - return users - - -Base.metadata.create_all(engine) - - -@contextlib.contextmanager -def unit(): - session = Session() - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - session.close() - - -def main(): - with unit() as session: - user = create_user(session, "Arjan", "details", "preferences") - print(user, user.user_detail, user.user_preference) - session.flush() - user = update_user(session, user.id, "Arjan Updated", "more details", "updated preferences") - print(user, user.user_detail, user.user_preference) - user = get_user(session, user.id) - print(user) - users = get_users(session) - print(users) - delete_user(session, user.id) - users = get_users(session) - print(users) - - -if __name__ == "__main__": - main() diff --git a/2024/unit_of_work/unit_of_work.py b/2024/unit_of_work/unit_of_work.py index 754b5c30..2bcde1e5 100644 --- a/2024/unit_of_work/unit_of_work.py +++ b/2024/unit_of_work/unit_of_work.py @@ -1,84 +1,115 @@ -from sqlalchemy import create_engine, Integer, String, ForeignKey -from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, Mapped, mapped_column, Session -from typing import Optional import contextlib -import logging - -logging.basicConfig(level = logging.INFO) +from typing import Optional -DATABASE_URL = "sqlite:///example.db" +from sqlalchemy import ForeignKey, String, create_engine +from sqlalchemy.orm import ( + Mapped, + declarative_base, + mapped_column, + relationship, + scoped_session, + sessionmaker, +) + +DATABASE_URL = "sqlite:///:memory:" engine = create_engine(DATABASE_URL) -SessionMaker = scoped_session(sessionmaker(bind = engine)) +Session = scoped_session(sessionmaker(bind=engine)) Base = declarative_base() class User(Base): - __tablename__ = 'users' - id: Mapped[int] = mapped_column(Integer, primary_key = True) - name: Mapped[str] = mapped_column(String) - user_detail: Mapped['UserDetail'] = relationship('UserDetail', back_populates = 'user', uselist = False, - cascade = "all, delete-orphan") - user_preference: Mapped['UserPreference'] = relationship('UserPreference', back_populates = 'user', uselist = False, - cascade = "all, delete-orphan") + __tablename__ = "users" + id: Mapped[int] = mapped_column( + primary_key=True, index=True, unique=True, autoincrement=True + ) + name: Mapped[str] = mapped_column() + user_detail: Mapped["UserDetail"] = relationship( + "UserDetail", back_populates="user", uselist=False, cascade="all, delete-orphan" + ) + user_preference: Mapped["UserPreference"] = relationship( + "UserPreference", + back_populates="user", + uselist=False, + cascade="all, delete-orphan", + ) def __repr__(self) -> str: return f"User(id={self.id!r}, name={self.name!r})" class UserDetail(Base): - __tablename__ = 'user_details' - id: Mapped[int] = mapped_column(Integer, primary_key = True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), unique = True) - details: Mapped[str] = mapped_column(String) - user: Mapped['User'] = relationship('User', back_populates = 'user_detail') + __tablename__ = "user_details" + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True) + details: Mapped[str] = mapped_column() + user: Mapped["User"] = relationship("User", back_populates="user_detail") def __repr__(self) -> str: return f"UserDetail(id={self.id!r}, details={self.details!r})" class UserPreference(Base): - __tablename__ = 'user_preferences' - id: Mapped[int] = mapped_column(Integer, primary_key = True) - user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id'), unique = True) + __tablename__ = "user_preferences" + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True) preference: Mapped[str] = mapped_column(String) - user: Mapped['User'] = relationship('User', back_populates = 'user_preference') + user: Mapped["User"] = relationship("User", back_populates="user_preference") def __repr__(self) -> str: return f"UserPreference(id={self.id!r}, preference={self.preference!r})" -def create_user(session: Session, name: str, details: Optional[str] = None, preferences: Optional[str] = None) -> User: - user = User(name = name) +def create_user( + session: Session, + name: str, + details: Optional[str] = None, + preferences: Optional[str] = None, +) -> User: + user = User(name=name) if details: - user.user_detail = UserDetail(details = details) + user.user_detail = UserDetail(details=details) if preferences: - user.user_preference = UserPreference(preference = preferences) + user.user_preference = UserPreference(preference=preferences) session.add(user) return user -def update_user(session: Session, user_id: int, name: Optional[str] = None, details: Optional[str] = None, - preferences: Optional[str] = None) -> Optional[User]: +def update_user( + session: Session, + user_id: int, + name: Optional[str] = None, + details: Optional[str] = None, + preferences: Optional[str] = None, +) -> User: user: Optional[User] = session.get(User, user_id) if not user: - return None + raise ValueError(f"User {user_id} not found") if name: user.name = name if details: - user.user_detail.details = details + if user.user_detail: + user.user_detail.details = details + else: + user.user_detail = UserDetail(details=details) if preferences: - user.user_preference.preference = preferences + if user.user_preference: + user.user_preference.preference = preferences + else: + user.user_preference = UserPreference(preference=preferences) return user def delete_user(session: Session, user_id: int) -> None: user: Optional[User] = session.get(User, user_id) - if user: - session.delete(user) + if not user: + raise ValueError(f"User {user_id} not found") + session.delete(user) -def get_user(session: Session, user_id: int) -> Optional[User]: - user: Optional[User] = session.get(User, user_id) +def get_user(session: Session, user_id: int) -> User: + user = session.get(User, user_id) + if not user: + raise ValueError(f"User {user_id} not found") return user @@ -92,36 +123,36 @@ def get_users(session: Session) -> list[User]: @contextlib.contextmanager def unit(): - session = SessionMaker() + session = Session() try: yield session session.commit() - except Exception: + except Exception as e: + print("Rolling back") session.rollback() - raise + raise e finally: session.close() def main() -> None: - with unit() as session: - user = create_user(session, "Arjan", "details", "preferences") - print(user, user.user_detail, user.user_preference) - session.flush() - user: User = update_user(session, user.id, "Arjan Updated", "more details", "updated preferences") - - if user is None: - return - - print(user, user.user_detail, user.user_preference) - user = get_user(session, user.id) - if user is None: - return + try: + with unit() as session: + user = create_user(session, "Arjan", "details", "preferences") + print(user, user.user_detail, user.user_preference) + session.flush() + print(user) + user = update_user( + session, 1, "Arjan Updated", "more details", "updated preferences" + ) + print(user, user.user_detail, user.user_preference) + user = get_user(session, user.id) + print(user) + delete_user(session, 1) + except ValueError as e: + print(e) - print(user) - users = get_users(session) - print(users) - delete_user(session, user.id) + with unit() as session: users = get_users(session) print(users) diff --git a/2024/unit_of_work/unit_of_work_basic.py b/2024/unit_of_work/unit_of_work_basic.py new file mode 100644 index 00000000..b8ad1479 --- /dev/null +++ b/2024/unit_of_work/unit_of_work_basic.py @@ -0,0 +1,73 @@ +from dataclasses import dataclass, field + + +@dataclass +class User: + username: str + + +@dataclass +class UnitOfWork: + new_users: list = field(default_factory=list) + dirty_users: list = field(default_factory=list) + removed_users: list = field(default_factory=list) + + def register_new(self, user: User) -> None: + self.new_users.append(user) + + def register_dirty(self, user: User) -> None: + if user not in self.dirty_users: + self.dirty_users.append(user) + + def register_removed(self, user: User) -> None: + self.removed_users.append(user) + + def commit(self) -> None: + self.insert_new() + self.update_dirty() + self.delete_removed() + + def insert_new(self): + for obj in self.new_users: + # Insert new object into the database + print(f"Inserting {obj}") + # For demonstration, pretend we insert into a database here + self.new_users.clear() + + def update_dirty(self): + for obj in self.dirty_users: + # Update existing object in the database + print(f"Updating {obj}") + # For demonstration, pretend we update in a database here + self.dirty_users.clear() + + def delete_removed(self): + for obj in self.removed_users: + # Remove object from the database + print(f"Deleting {obj}") + # For demonstration, pretend we delete from a database here + self.removed_users.clear() + + +def main() -> None: + # Creating a new Unit of Work + uow = UnitOfWork() + + # Creating a new user + new_user = User("john_doe") + uow.register_new(new_user) + + # Simulate updating a user + existing_user = User("existing_user") + uow.register_dirty(existing_user) + + # Simulate removing a user + removed_user = User("removed_user") + uow.register_removed(removed_user) + + # Committing changes + uow.commit() + + +if __name__ == "__main__": + main() diff --git a/2024/unit_of_work/unit_of_work_skeleton.py b/2024/unit_of_work/unit_of_work_repository.py similarity index 66% rename from 2024/unit_of_work/unit_of_work_skeleton.py rename to 2024/unit_of_work/unit_of_work_repository.py index 9616887c..5dec886e 100644 --- a/2024/unit_of_work/unit_of_work_skeleton.py +++ b/2024/unit_of_work/unit_of_work_repository.py @@ -1,9 +1,10 @@ -import sqlite3 import logging -from typing import List, Dict, Any, Optional +import sqlite3 +from typing import Any, Optional logging.basicConfig(level=logging.INFO) + class DBConnectionHandler: def __init__(self, db_name: str): self.db_name: str = db_name @@ -14,56 +15,72 @@ def __enter__(self) -> sqlite3.Connection: self.connection.row_factory = sqlite3.Row return self.connection - def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: + def __exit__( + self, + exc_type: Optional[type], + exc_val: Optional[Exception], + exc_tb: Optional[Any], + ) -> None: assert self.connection is not None self.connection.close() + def create_tables() -> None: - with DBConnectionHandler('example.db') as connection: + with DBConnectionHandler("example.db") as connection: cursor: sqlite3.Cursor = connection.cursor() - cursor.execute(''' + cursor.execute(""" CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, quantity INTEGER NOT NULL ) - ''') + """) connection.commit() + def drop_tables() -> None: - with DBConnectionHandler('example.db') as connection: + with DBConnectionHandler("example.db") as connection: cursor: sqlite3.Cursor = connection.cursor() - cursor.execute('DROP TABLE IF EXISTS items') + cursor.execute("DROP TABLE IF EXISTS items") connection.commit() + class Repository: def __init__(self, connection: sqlite3.Connection): self.connection: sqlite3.Connection = connection def add(self, name: str, quantity: int) -> None: cursor: sqlite3.Cursor = self.connection.cursor() - cursor.execute('INSERT INTO items (name, quantity) VALUES (?, ?)', (name, quantity)) + cursor.execute( + "INSERT INTO items (name, quantity) VALUES (?, ?)", (name, quantity) + ) self.connection.commit() - def all(self) -> List[Dict[str, Any]]: + def all(self) -> list[dict[str, Any]]: cursor: sqlite3.Cursor = self.connection.cursor() - cursor.execute('SELECT * FROM items') + cursor.execute("SELECT * FROM items") return [dict(row) for row in cursor.fetchall()] + class UnitOfWork: - def __init__(self, db_name: str = 'example.db'): + def __init__(self, db_name: str = "example.db"): self.db_name: str = db_name self.connection: Optional[sqlite3.Connection] = None self.repository: Optional[Repository] = None - def __enter__(self) -> 'UnitOfWork': + def __enter__(self) -> "UnitOfWork": self.connection = sqlite3.connect(self.db_name) - self.connection.execute('BEGIN') + self.connection.execute("BEGIN") self.connection.row_factory = sqlite3.Row self.repository = Repository(self.connection) return self - def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_tb: Optional[Any]) -> None: + def __exit__( + self, + exc_type: Optional[type], + exc_val: Optional[Exception], + exc_tb: Optional[Any], + ) -> None: assert self.connection is not None if exc_val: self.connection.rollback() @@ -71,19 +88,20 @@ def __exit__(self, exc_type: Optional[type], exc_val: Optional[Exception], exc_t self.connection.commit() self.connection.close() + def main() -> None: create_tables() try: with UnitOfWork() as uow: assert uow.repository is not None - uow.repository.add('Apple', 10) - uow.repository.add('Banana', 20) + uow.repository.add("Apple", 10) + uow.repository.add("Banana", 20) # raise Exception("Something went wrong") except Exception as e: logging.error(f"Error during database operation: {e}") - with DBConnectionHandler('example.db') as connection: + with DBConnectionHandler("example.db") as connection: repo: Repository = Repository(connection) logging.info("Items in the database:") for item in repo.all(): @@ -91,5 +109,6 @@ def main() -> None: drop_tables() -if __name__ == '__main__': + +if __name__ == "__main__": main()