From c4f0176d589dd63228c140eda6db6f46a214f762 Mon Sep 17 00:00:00 2001 From: Azide Date: Fri, 7 Jun 2024 00:21:03 +0800 Subject: [PATCH] =?UTF-8?q?:recycle:=20=E8=B0=83=E6=95=B4=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=87=BD=E6=95=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nonebot_bison/platform/ceobecanteen/models.py | 2 +- .../platform/ceobecanteen/platform.py | 73 +++++++++---------- 2 files changed, 34 insertions(+), 41 deletions(-) diff --git a/nonebot_bison/platform/ceobecanteen/models.py b/nonebot_bison/platform/ceobecanteen/models.py index adb5a0f8..1987948d 100644 --- a/nonebot_bison/platform/ceobecanteen/models.py +++ b/nonebot_bison/platform/ceobecanteen/models.py @@ -82,7 +82,7 @@ class CeobeTimestamp(BaseModel): fetcher: int """蹲饼时间,毫秒""" platform_precision: Literal["none", "day", "hour", "minute", "second", "ms"] - """平台时间精度""" + """平台时间精度,不足的长度补0""" platform: int | None = None """平台时间戳,毫秒""" diff --git a/nonebot_bison/platform/ceobecanteen/platform.py b/nonebot_bison/platform/ceobecanteen/platform.py index 7671ebbb..439d97ef 100644 --- a/nonebot_bison/platform/ceobecanteen/platform.py +++ b/nonebot_bison/platform/ceobecanteen/platform.py @@ -1,7 +1,7 @@ +from typing import ParamSpec +from functools import partial from datetime import timedelta -from typing import Any, ParamSpec from collections import defaultdict -from collections.abc import Callable, Coroutine from httpx import AsyncClient from nonebot import logger, require @@ -174,35 +174,7 @@ async def parse(self, raw_post: CeobeCookie) -> Post: target = await self.data_source_cache.get_by_source(raw_post.source) assert target, "target not found" - match raw_post.source.type: - case "arknights-website:official-website": - - async def owss(url: str) -> CeobeTextPic: - return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)]) - - content, pics = await self.ceobecanteen_snapshot( - raw_post, - owss, - raw_post.item.url, - ) - - case "arknights-game:bulletin-list" if raw_post.item.display_type != 2: - - async def blss(url: str) -> CeobeTextPic: - return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)]) - - content, pics = await self.ceobecanteen_snapshot( - raw_post, - blss, - raw_post.item.url, - ) - - case _: - - async def npss() -> CeobeTextPic: - raise CeobeSnapshotSkip("无需截图的数据源") - - content, pics = await self.ceobecanteen_snapshot(raw_post, npss) # lambda不允许raise语句 + content, pics = await self.take_snapshot(raw_post) timestamp = raw_post.timestamp.platform or raw_post.timestamp.fetcher if timestamp: @@ -211,7 +183,7 @@ async def npss() -> CeobeTextPic: retweet: Post | None = None if raw_post.item.is_retweeted and raw_post.item.retweeted: raw_retweet_pics = raw_post.item.retweeted.images or [] - retweet_pics = await self.handle_images_list(raw_retweet_pics, raw_post.source.type) + retweet_pics = await self.parse_retweet_images(raw_retweet_pics, raw_post.source.type) retweet = Post( self, @@ -234,6 +206,7 @@ async def npss() -> CeobeTextPic: ) async def snapshot_official_website(self, url: str) -> bytes: + """截取小刻官网的截图""" require("nonebot_plugin_htmlrender") from nonebot_plugin_htmlrender import get_new_page @@ -273,6 +246,7 @@ async def snapshot_official_website(self, url: str) -> bytes: return pic_data async def snapshot_bulletin_list(self, url: str) -> bytes: + """截取小刻公告列表的截图""" selector = "body > div.main > div.container" viewport = {"width": 1024, "height": 19990} @@ -290,30 +264,49 @@ async def snapshot_bulletin_list(self, url: str) -> bytes: else: return pic_data - async def ceobecanteen_snapshot( + async def take_snapshot( self, raw_post: CeobeCookie, - snapshot_func: Callable[P, Coroutine[Any, Any, CeobeTextPic]], - *args: P.args, - **kwargs: P.kwargs, ) -> CeobeTextPic: + """判断数据源类型,判断是否需要截图""" + + match raw_post.source.type: + case "arknights-website:official-website": + + async def owss(url: str) -> CeobeTextPic: + return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)]) + + snapshot_func = partial(owss, raw_post.item.url) + case "arknights-game:bulletin-list" if raw_post.item.display_type != 2: + + async def blss(url: str) -> CeobeTextPic: + return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)]) + + snapshot_func = partial(blss, raw_post.item.url) + case _: + + async def npss() -> CeobeTextPic: + raise CeobeSnapshotSkip("无需截图的数据源") + + snapshot_func = partial(npss) + raw_pics = raw_post.default_cookie.images or [] try: if not plugin_config.bison_use_browser: raise CeobeSnapshotSkip("未启用浏览器") - res = await snapshot_func(*args, **kwargs) + res = await snapshot_func() except CeobeSnapshotSkip as e: logger.info(f"skip snapshot: {e}") - pics = await self.handle_images_list(raw_pics, raw_post.source.type) + pics = await self.parse_retweet_images(raw_pics, raw_post.source.type) res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics)) except CeobeSnapshotFailed: logger.exception("snapshot failed") - pics = await self.handle_images_list(raw_pics, raw_post.source.type) + pics = await self.parse_retweet_images(raw_pics, raw_post.source.type) res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics)) return res - async def handle_images_list(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]: + async def parse_retweet_images(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]: if source_type.startswith("weibo"): retweet_pics = await self.download_weibo_image([image.origin_url for image in images]) else: