Skip to content

Commit

Permalink
Merge branch 'master' into mi
Browse files Browse the repository at this point in the history
  • Loading branch information
bpepple authored Sep 14, 2024
2 parents 9000094 + 1ff781b commit 84b5b16
Show file tree
Hide file tree
Showing 16 changed files with 759 additions and 645 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/ruff.yml

This file was deleted.

2 changes: 1 addition & 1 deletion darkseid/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Projects version."""

__version__ = "4.0.5"
__version__ = "4.4.0"
3 changes: 3 additions & 0 deletions darkseid/archivers/archiver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand All @@ -13,6 +14,8 @@ class Archiver:
This class provides methods for reading, writing, and removing files within an archive.
"""

IMAGE_EXT_RE = re.compile(r"\.(jpe?g|png|webp|gif)$", re.IGNORECASE)

def __init__(self: Archiver, path: Path) -> None:
"""
Initializes an Archiver object with a specified path.
Expand Down
122 changes: 53 additions & 69 deletions darkseid/archivers/zip.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from __future__ import annotations

import logging
import shutil
import tempfile
import zipfile
from typing import TYPE_CHECKING
from zipfile import ZIP_DEFLATED, ZIP_STORED, BadZipfile, ZipFile

from darkseid.zipfile_remove import ZipFileWithRemove

if TYPE_CHECKING:
from pathlib import Path
from typing import cast

import rarfile

Expand Down Expand Up @@ -51,9 +50,9 @@ def read_file(self: ZipArchiver, archive_file: str) -> bytes:
"""

try:
with zipfile.ZipFile(self.path, mode="r") as zf:
with ZipFile(self.path, mode="r") as zf:
return zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
except (BadZipfile, OSError) as e:
logger.exception(
"Error reading zip archive %s :: %s",
self.path,
Expand All @@ -71,8 +70,20 @@ def remove_file(self: ZipArchiver, archive_file: str) -> bool:
Returns:
bool: True if the file was successfully removed, False otherwise.
"""

return self._rebuild([archive_file])
try:
with ZipFileWithRemove(self.path, "a") as zf:
zf.remove(archive_file)
except KeyError:
return False
except (BadZipfile, OSError):
logger.exception(
"Error writing zip archive %s :: %s",
self.path,
archive_file,
)
return False
else:
return True

def remove_files(self: ZipArchiver, filename_lst: list[str]) -> bool:
"""
Expand All @@ -84,42 +95,43 @@ def remove_files(self: ZipArchiver, filename_lst: list[str]) -> bool:
Returns:
bool: True if all files were successfully removed, False otherwise.
"""

return self._rebuild(filename_lst)

def write_file(self: ZipArchiver, archive_file: str, data: str) -> bool:
files = set(self.get_filename_list())
if filenames_to_remove := [filename for filename in filename_lst if filename in files]:
try:
with ZipFileWithRemove(self.path, "a") as zf:
for filename in filenames_to_remove:
zf.remove(filename)
except (BadZipfile, OSError):
logger.exception(
"Error writing zip archive %s :: %s",
self.path,
filename,
)
return False
return True

def write_file(self: ZipArchiver, fn: str, data: str) -> bool:
"""
Writes data to a file in the ZIP archive.
Args:
archive_file (str): The file to write to in the archive.
fn (str): The file to write to in the archive.
data (str): The data to write to the file.
Returns:
bool: True if the write operation was successful, False otherwise.
"""

# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
if archive_file in files:
self._rebuild([archive_file])

compress = ZIP_DEFLATED if self.IMAGE_EXT_RE.search(fn) is None else ZIP_STORED
try:
# now just add the archive file as a new one
with zipfile.ZipFile(
self.path,
mode="a",
allowZip64=True,
compression=zipfile.ZIP_DEFLATED,
) as zf:
zf.writestr(archive_file, data)
except (zipfile.BadZipfile, OSError):
with ZipFileWithRemove(self.path, "a") as zf:
if fn in set(zf.namelist()):
zf.remove(fn)
zf.writestr(fn, data, compress_type=compress, compresslevel=9)
except (BadZipfile, OSError):
logger.exception(
"Error writing zip archive %s :: %s",
self.path,
archive_file,
fn,
)
return False
else:
Expand All @@ -137,45 +149,12 @@ def get_filename_list(self: ZipArchiver) -> list[str]:
"""

try:
with zipfile.ZipFile(self.path, mode="r") as zf:
with ZipFile(self.path, mode="r") as zf:
return zf.namelist()
except (zipfile.BadZipfile, OSError):
except (BadZipfile, OSError):
logger.exception("Error listing files in zip archive: %s", self.path)
return []

def _rebuild(self: ZipArchiver, exclude_list: list[str]) -> bool:
"""
Rebuilds the ZIP archive excluding specified files.
Args:
exclude_list (list[str]): The list of files to exclude from the rebuild.
Returns:
bool: True if the rebuild was successful, False otherwise.
"""

try:
with zipfile.ZipFile(
tempfile.NamedTemporaryFile(dir=self.path.parent, delete=False),
"w",
allowZip64=True,
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
for item in zin.infolist():
buffer = zin.read(item.filename)
if item.filename not in exclude_list:
zout.writestr(item, buffer)

# replace with the new file
self.path.unlink(missing_ok=True)
zout.close() # Required on Windows
shutil.move(cast(str, zout.filename), self.path)
except (zipfile.BadZipfile, OSError):
logger.exception("Error rebuilding zip file: %s", self.path)
return False
else:
return True

def copy_from_archive(self: ZipArchiver, other_archive: Archiver) -> bool:
"""
Copies files from another archive to the ZIP archive.
Expand All @@ -188,16 +167,21 @@ def copy_from_archive(self: ZipArchiver, other_archive: Archiver) -> bool:
"""

try:
with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
with ZipFile(self.path, mode="w", allowZip64=True) as zout:
for filename in other_archive.get_filename_list():
try:
data = other_archive.read_file(filename)
except rarfile.BadRarFile:
# Skip any bad images in the file.
continue
if data is not None:
zout.writestr(filename, data)
except (zipfile.BadZipfile, OSError) as e:
compress = (
ZIP_DEFLATED
if self.IMAGE_EXT_RE.search(filename) is None
else ZIP_STORED
)
zout.writestr(filename, data, compress_type=compress, compresslevel=9)
except (BadZipfile, OSError) as e:
# Remove any partial files created
if self.path.exists():
self.path.unlink()
Expand Down
20 changes: 12 additions & 8 deletions darkseid/comic.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,12 @@ def remove_metadata(self: Comic) -> bool:
"""

if self.has_metadata():
write_success = self._archiver.remove_file(self._ci_xml_filename)
metadata_files = [
path
for path in self._archiver.get_filename_list()
if Path(path).name.lower() == self._ci_xml_filename.lower()
]
write_success = self._archiver.remove_files(metadata_files)
return self._successful_write(write_success, False, None)
return True

Expand Down Expand Up @@ -377,6 +382,11 @@ def _successful_write(
self.reset_cache()
return write_success

def _check_for_metadata(self: Comic) -> bool:
target_filename = self._ci_xml_filename.lower()
filenames = {Path(path).name.lower() for path in self._archiver.get_filename_list()}
return target_filename in filenames

def has_metadata(self: Comic) -> bool:
"""
Checks to see if the archive has metadata.
Expand All @@ -386,13 +396,7 @@ def has_metadata(self: Comic) -> bool:
"""

if self._has_md is None:
self._has_md = bool(
self.seems_to_be_a_comic_archive()
and (
not self.seems_to_be_a_comic_archive()
or self._ci_xml_filename in self._archiver.get_filename_list()
),
)
self._has_md = self.seems_to_be_a_comic_archive() and self._check_for_metadata()

return self._has_md

Expand Down
6 changes: 4 additions & 2 deletions darkseid/comicinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ def get_resource_list(resource: list[Basic] | list[Arc]) -> str | None:

if md.publisher:
assign("Publisher", md.publisher.name)
assign("Imprint", md.imprint)
if md.imprint:
assign("Imprint", md.imprint.name)
assign("Genre", get_resource_list(md.genres))
assign("Web", md.web_link)
assign("PageCount", md.page_count)
Expand Down Expand Up @@ -345,7 +346,8 @@ def get(txt: str) -> str | int | None:
md.cover_date = date(tmp_year, tmp_month, 1)

md.publisher = Basic(xlate(get("Publisher")))
md.imprint = xlate(get("Imprint"))
if imprint := xlate(get("Imprint")): # Make sure Imprint element is present.
md.imprint = Basic(imprint)
md.genres = self.string_to_resource(xlate(get("Genre")))
md.web_link = xlate(get("Web"))
md.series.language = xlate(get("LanguageISO"))
Expand Down
8 changes: 4 additions & 4 deletions darkseid/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from __future__ import annotations

from dataclasses import dataclass, field, fields
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypedDict

if TYPE_CHECKING:
from datetime import date
from decimal import Decimal
from typing import TypedDict


import pycountry

Expand Down Expand Up @@ -374,6 +374,7 @@ class Metadata:
collection_title (Optional[str]): The title of the collection.
stories (list[Basic]): The list of stories.
publisher (Optional[Basic]): The publisher information.
imprint (Optional[Basic]): The imprint information.
cover_date (Optional[date]): The cover date.
store_date (Optional[date]): The store date.
prices (list[Price]): The list of prices.
Expand All @@ -387,7 +388,6 @@ class Metadata:
alternate_series (Optional[str]): The alternate series.
alternate_number (Optional[str]): The alternate number.
alternate_count (Optional[int]): The count of alternates.
imprint (Optional[str]): The imprint.
notes (Optional[str]): The notes.
web_link (Optional[str]): The web link.
manga (Optional[str]): The manga information.
Expand Down Expand Up @@ -435,6 +435,7 @@ class Metadata:
collection_title: str | None = None
stories: list[Basic] = field(default_factory=list)
publisher: Basic | None = None
imprint: Basic | None = None
cover_date: date | None = None
store_date: date | None = None
prices: list[Price] = field(default_factory=list)
Expand All @@ -450,7 +451,6 @@ class Metadata:
alternate_series: str | None = None
alternate_number: str | None = None
alternate_count: int | None = None
imprint: str | None = None
notes: str | None = None
web_link: str | None = None
manga: str | None = None
Expand Down
Loading

0 comments on commit 84b5b16

Please sign in to comment.