Skip to content

Commit

Permalink
♻️ 调整截图函数逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
AzideCupric committed Jun 6, 2024
1 parent f17d22e commit c4f0176
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 41 deletions.
2 changes: 1 addition & 1 deletion nonebot_bison/platform/ceobecanteen/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CeobeTimestamp(BaseModel):
fetcher: int
"""蹲饼时间,毫秒"""
platform_precision: Literal["none", "day", "hour", "minute", "second", "ms"]
"""平台时间精度"""
"""平台时间精度,不足的长度补0"""
platform: int | None = None
"""平台时间戳,毫秒"""

Expand Down
73 changes: 33 additions & 40 deletions nonebot_bison/platform/ceobecanteen/platform.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import ParamSpec
from functools import partial
from datetime import timedelta
from typing import Any, ParamSpec
from collections import defaultdict
from collections.abc import Callable, Coroutine

from httpx import AsyncClient
from nonebot import logger, require
Expand Down Expand Up @@ -174,35 +174,7 @@ async def parse(self, raw_post: CeobeCookie) -> Post:
target = await self.data_source_cache.get_by_source(raw_post.source)
assert target, "target not found"

match raw_post.source.type:
case "arknights-website:official-website":

async def owss(url: str) -> CeobeTextPic:
return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)])

content, pics = await self.ceobecanteen_snapshot(
raw_post,
owss,
raw_post.item.url,
)

case "arknights-game:bulletin-list" if raw_post.item.display_type != 2:

async def blss(url: str) -> CeobeTextPic:
return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)])

content, pics = await self.ceobecanteen_snapshot(
raw_post,
blss,
raw_post.item.url,
)

case _:

async def npss() -> CeobeTextPic:
raise CeobeSnapshotSkip("无需截图的数据源")

content, pics = await self.ceobecanteen_snapshot(raw_post, npss) # lambda不允许raise语句
content, pics = await self.take_snapshot(raw_post)

timestamp = raw_post.timestamp.platform or raw_post.timestamp.fetcher
if timestamp:
Expand All @@ -211,7 +183,7 @@ async def npss() -> CeobeTextPic:
retweet: Post | None = None
if raw_post.item.is_retweeted and raw_post.item.retweeted:
raw_retweet_pics = raw_post.item.retweeted.images or []
retweet_pics = await self.handle_images_list(raw_retweet_pics, raw_post.source.type)
retweet_pics = await self.parse_retweet_images(raw_retweet_pics, raw_post.source.type)

retweet = Post(
self,
Expand All @@ -234,6 +206,7 @@ async def npss() -> CeobeTextPic:
)

async def snapshot_official_website(self, url: str) -> bytes:
"""截取小刻官网的截图"""
require("nonebot_plugin_htmlrender")
from nonebot_plugin_htmlrender import get_new_page

Expand Down Expand Up @@ -273,6 +246,7 @@ async def snapshot_official_website(self, url: str) -> bytes:
return pic_data

async def snapshot_bulletin_list(self, url: str) -> bytes:
"""截取小刻公告列表的截图"""
selector = "body > div.main > div.container"
viewport = {"width": 1024, "height": 19990}

Expand All @@ -290,30 +264,49 @@ async def snapshot_bulletin_list(self, url: str) -> bytes:
else:
return pic_data

async def ceobecanteen_snapshot(
async def take_snapshot(
self,
raw_post: CeobeCookie,
snapshot_func: Callable[P, Coroutine[Any, Any, CeobeTextPic]],
*args: P.args,
**kwargs: P.kwargs,
) -> CeobeTextPic:
"""判断数据源类型,判断是否需要截图"""

match raw_post.source.type:
case "arknights-website:official-website":

async def owss(url: str) -> CeobeTextPic:
return CeobeTextPic(text="", pics=[await self.snapshot_official_website(url)])

snapshot_func = partial(owss, raw_post.item.url)
case "arknights-game:bulletin-list" if raw_post.item.display_type != 2:

async def blss(url: str) -> CeobeTextPic:
return CeobeTextPic(text="", pics=[await self.snapshot_bulletin_list(url)])

snapshot_func = partial(blss, raw_post.item.url)
case _:

async def npss() -> CeobeTextPic:
raise CeobeSnapshotSkip("无需截图的数据源")

snapshot_func = partial(npss)

raw_pics = raw_post.default_cookie.images or []
try:
if not plugin_config.bison_use_browser:
raise CeobeSnapshotSkip("未启用浏览器")
res = await snapshot_func(*args, **kwargs)
res = await snapshot_func()
except CeobeSnapshotSkip as e:
logger.info(f"skip snapshot: {e}")
pics = await self.handle_images_list(raw_pics, raw_post.source.type)
pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))
except CeobeSnapshotFailed:
logger.exception("snapshot failed")
pics = await self.handle_images_list(raw_pics, raw_post.source.type)
pics = await self.parse_retweet_images(raw_pics, raw_post.source.type)
res = CeobeTextPic(text=raw_post.default_cookie.text, pics=list(pics))

return res

async def handle_images_list(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]:
async def parse_retweet_images(self, images: list[CeobeImage], source_type: str) -> list[bytes] | list[str]:
if source_type.startswith("weibo"):
retweet_pics = await self.download_weibo_image([image.origin_url for image in images])
else:
Expand Down

0 comments on commit c4f0176

Please sign in to comment.