From baf9a83e50a10350c40548b43839d04c06521c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B6=B5=E6=9B=A6?= Date: Wed, 25 Sep 2024 03:11:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=AD=8C=E6=9B=B2=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E5=9B=BE=E7=89=87=E6=94=B9=E4=B8=BAurl=20#19?= =?UTF-8?q?0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/test_music_tags.py | 15 +-- xiaomusic/config.py | 7 + xiaomusic/httpserver.py | 15 +++ xiaomusic/utils.py | 283 ++++++++++++++++++++-------------------- xiaomusic/xiaomusic.py | 24 +++- 5 files changed, 190 insertions(+), 154 deletions(-) diff --git a/test/test_music_tags.py b/test/test_music_tags.py index 97016de81..8025b7924 100644 --- a/test/test_music_tags.py +++ b/test/test_music_tags.py @@ -4,7 +4,7 @@ SUPPORT_MUSIC_TYPE, ) from xiaomusic.utils import ( - get_audio_metadata, + extract_audio_metadata, traverse_music_directory, ) @@ -20,12 +20,8 @@ async def test_one_music(filename): # 获取播放时长 try: - metadata = get_audio_metadata(filename) - print(metadata.title, metadata.album) - if metadata: - lyrics = metadata.lyrics - if lyrics: - print(f"歌曲 : {filename} 的 {lyrics}") + metadata = extract_audio_metadata(filename, "cache/picture_cache") + print(metadata) except Exception as e: print(f"歌曲 : {filename} no tag {e}") traceback.print_exc() @@ -34,13 +30,14 @@ async def test_one_music(filename): async def main(directory): # 获取所有歌曲文件 local_musics = traverse_music_directory(directory, 10, [], SUPPORT_MUSIC_TYPE) - print(local_musics) for _, files in local_musics.items(): for file in files: - await test_one_music(file) + print(file) + # await test_one_music(file) pass await test_one_music("./music/一生何求.mp3") + await test_one_music("./music/程响-人间烟火.flac") if __name__ == "__main__": diff --git a/xiaomusic/config.py b/xiaomusic/config.py index 60a6eec33..7a846ce80 100644 --- a/xiaomusic/config.py +++ b/xiaomusic/config.py @@ -255,3 +255,10 @@ def tag_cache_path(self): os.makedirs(self.cache_dir) filename = os.path.join(self.cache_dir, "tag_cache.json") return filename + + @property + def picture_cache_path(self): + cache_path = os.path.join(self.cache_dir, "picture_cache") + if not os.path.exists(cache_path): + os.makedirs(cache_path) + return cache_path diff --git a/xiaomusic/httpserver.py b/xiaomusic/httpserver.py index fd94d8870..b81fa52a6 100644 --- a/xiaomusic/httpserver.py +++ b/xiaomusic/httpserver.py @@ -429,3 +429,18 @@ async def music_options(): "Accept-Ranges": "bytes", } return Response(headers=headers) + + +@app.get("/picture/{file_path:path}") +async def get_picture(request: Request, file_path: str): + absolute_path = os.path.abspath(config.picture_cache_path) + absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path)) + if not absolute_file_path.startswith(absolute_path): + raise HTTPException(status_code=404, detail="File not found") + if not os.path.exists(absolute_file_path): + raise HTTPException(status_code=404, detail="File not found") + + mime_type, _ = mimetypes.guess_type(absolute_file_path) + if mime_type is None: + mime_type = "image/jpeg" + return FileResponse(absolute_file_path, media_type=mime_type) diff --git a/xiaomusic/utils.py b/xiaomusic/utils.py index 075715789..39cf3c5dd 100644 --- a/xiaomusic/utils.py +++ b/xiaomusic/utils.py @@ -5,6 +5,7 @@ import base64 import copy import difflib +import hashlib import json import logging import mimetypes @@ -22,13 +23,14 @@ import aiohttp import mutagen +from mutagen.asf import ASF from mutagen.flac import FLAC -from mutagen.id3 import APIC, ID3 -from mutagen.monkeysaudio import MonkeysAudio +from mutagen.id3 import ID3, Encoding, TextFrame, TimeStampTextFrame from mutagen.mp3 import MP3 from mutagen.mp4 import MP4 from mutagen.oggvorbis import OggVorbis from mutagen.wave import WAVE +from mutagen.wavpack import WavPack from opencc import OpenCC from requests.utils import cookiejar_from_dict @@ -492,21 +494,22 @@ def chinese_to_number(chinese): return result -def get_audio_metadata(file_path): - ret = Metadata() - if file_path.endswith(".mp3"): - ret = get_mp3_metadata(file_path) - elif file_path.endswith(".flac"): - ret = get_flac_metadata(file_path) - elif file_path.endswith(".wav"): - ret = get_wav_metadata(file_path) - elif file_path.endswith(".ape"): - ret = get_ape_metadata(file_path) - elif file_path.endswith(".ogg"): - ret = get_ogg_metadata(file_path) - elif file_path.endswith(".m4a"): - ret = get_m4a_metadata(file_path) - return {k: str(v) for k, v in asdict(ret).items()} +def list2str(li, verbose=False): + if len(li) > 5 and not verbose: + return f"{li[:2]} ... {li[-2:]} with len: {len(li)}" + else: + return f"{li}" + + +async def get_latest_version(package_name: str) -> str: + url = f"https://pypi.org/pypi/{package_name}/json" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + return data["info"]["version"] + else: + return None @dataclass @@ -520,134 +523,132 @@ class Metadata: lyrics: str = "" -def get_mp3_metadata(file_path): - audio = MP3(file_path, ID3=ID3) - tags = audio.tags - if tags is None: - return Metadata() - - # 处理编码 - def get_tag_value(tags, k): - if k not in tags: - return "" - v = tags[k] - if isinstance(v, mutagen.id3.TextFrame) and not isinstance( - v, mutagen.id3.TimeStampTextFrame - ): - old_ts = "".join(v.text) - if v.encoding == mutagen.id3.Encoding.LATIN1: - bs = old_ts.encode("latin1") - ts = bs.decode("GBK", errors="ignore") - return ts - return old_ts - return v - - metadata = Metadata( - title=get_tag_value(tags, "TIT2"), - artist=get_tag_value(tags, "TPE1"), - album=get_tag_value(tags, "TALB"), - year=get_tag_value(tags, "TDRC"), - genre=get_tag_value(tags, "TCON"), - ) - - for tag in tags.values(): - if isinstance(tag, APIC): - metadata.picture = base64.b64encode(tag.data).decode("utf-8") - break - - lyrics = tags.getall("USLT") - if lyrics: - metadata.lyrics = lyrics[0] - - return metadata - - -def get_flac_metadata(file_path): - audio = FLAC(file_path) - metadata = Metadata( - title=audio.get("title", [""])[0], - artist=audio.get("artist", [""])[0], - album=audio.get("album", [""])[0], - year=audio.get("date", [""])[0], - genre=audio.get("genre", [""])[0], - ) - - if audio.pictures: - picture = audio.pictures[0] - metadata.picture = base64.b64encode(picture.data).decode("utf-8") - - if "lyrics" in audio: - metadata.lyrics = audio["lyrics"][0] - - return metadata - - -def get_wav_metadata(file_path): - audio = WAVE(file_path) - metadata = Metadata( - title=audio.get("TIT2", [""])[0], - artist=audio.get("TPE1", [""])[0], - album=audio.get("TALB", [""])[0], - year=audio.get("TDRC", [""])[0], - genre=audio.get("TCON", [""])[0], - ) - return metadata - - -def get_ape_metadata(file_path): - audio = MonkeysAudio(file_path) - metadata = Metadata( - title=audio.get("TIT2", [""])[0], - artist=audio.get("TPE1", [""])[0], - album=audio.get("TALB", [""])[0], - year=audio.get("TDRC", [""])[0], - genre=audio.get("TCON", [""])[0], - ) - return metadata - - -def get_ogg_metadata(file_path): - audio = OggVorbis(file_path) - metadata = Metadata( - title=audio.get("title", [""])[0], - artist=audio.get("artist", [""])[0], - album=audio.get("album", [""])[0], - year=audio.get("date", [""])[0], - genre=audio.get("genre", [""])[0], - ) - return metadata - +def _get_alltag_value(tags, k): + v = tags.getall(k) + if len(v) > 0: + return _to_utf8(v[0]) + return "" + + +def _get_tag_value(tags, k): + if k not in tags: + return "" + v = tags[k] + return _to_utf8(v) + + +def _to_utf8(v): + if isinstance(v, TextFrame) and not isinstance(v, TimeStampTextFrame): + old_ts = "".join(v.text) + if v.encoding == Encoding.LATIN1: + bs = old_ts.encode("latin1") + ts = bs.decode("GBK", errors="ignore") + return ts + return old_ts + elif isinstance(v, list): + return "".join(v) + return str(v) + + +def _save_picture(picture_data, save_root, file_path): + # 计算文件名的哈希值 + file_hash = hashlib.md5(file_path.encode("utf-8")).hexdigest() + # 创建目录结构 + dir_path = os.path.join(save_root, file_hash[-6:]) + os.makedirs(dir_path, exist_ok=True) + + # 检测图片格式 + if picture_data[:3] == b"\xff\xd8\xff": + ext = "jpg" + elif picture_data[:8] == b"\x89PNG\r\n\x1a\n": + ext = "png" + else: + ext = "bin" # 未知格式 -def get_m4a_metadata(file_path): - audio = MP4(file_path) - metadata = Metadata( - title=audio.tags.get("\xa9nam", [""])[0], - artist=audio.tags.get("\xa9ART", [""])[0], - album=audio.tags.get("\xa9alb", [""])[0], - year=audio.tags.get("\xa9day", [""])[0], - genre=audio.tags.get("\xa9gen", [""])[0], - ) + # 保存图片 + filename = os.path.basename(file_path) + (name, _) = os.path.splitext(filename) + picture_path = os.path.join(dir_path, f"{name}.{ext}") + with open(picture_path, "wb") as img: + img.write(picture_data) + return picture_path - if "covr" in audio.tags: - cover = audio.tags["covr"][0] - metadata.picture = base64.b64encode(cover).decode("utf-8") - return metadata +def extract_audio_metadata(file_path, save_root): + audio = mutagen.File(file_path) + metadata = Metadata() + tags = audio.tags + if tags is None: + return asdict(metadata) + + if isinstance(audio, MP3): + metadata.title = _get_tag_value(tags, "TIT2") + metadata.artist = _get_tag_value(tags, "TPE1") + metadata.album = _get_tag_value(tags, "TALB") + metadata.year = _get_tag_value(tags, "TDRC") + metadata.genre = _get_tag_value(tags, "TCON") + if "APIC:" in tags: + metadata.picture = _save_picture(tags["APIC:"].data, save_root, file_path) + metadata.lyrics = _get_alltag_value(tags, "USLT") + + elif isinstance(audio, FLAC): + metadata.title = _get_tag_value(tags, "TITLE") + metadata.artist = _get_tag_value(tags, "ARTIST") + metadata.album = _get_tag_value(tags, "ALBUM") + metadata.year = _get_tag_value(tags, "DATE") + metadata.genre = _get_tag_value(tags, "GENRE") + if audio.pictures: + metadata.picture = _save_picture( + audio.pictures[0].data, save_root, file_path + ) + if "lyrics" in audio: + metadata.lyrics = audio["lyrics"][0] + + elif isinstance(audio, MP4): + metadata.title = _get_tag_value(tags, "\xa9nam") + metadata.artist = _get_tag_value(tags, "\xa9ART") + metadata.album = _get_tag_value(tags, "\xa9alb") + metadata.year = _get_tag_value(tags, "\xa9day") + metadata.genre = _get_tag_value(tags, "\xa9gen") + if "covr" in tags: + metadata.picture = _save_picture(tags["covr"][0], save_root, file_path) + + elif isinstance(audio, OggVorbis): + metadata.title = _get_tag_value(tags, "TITLE") + metadata.artist = _get_tag_value(tags, "ARTIST") + metadata.album = _get_tag_value(tags, "ALBUM") + metadata.year = _get_tag_value(tags, "DATE") + metadata.genre = _get_tag_value(tags, "GENRE") + if "metadata_block_picture" in tags: + picture = json.loads(base64.b64decode(tags["metadata_block_picture"][0])) + metadata.picture = _save_picture( + base64.b64decode(picture["data"]), save_root, file_path + ) + elif isinstance(audio, ASF): + metadata.title = _get_tag_value(tags, "Title") + metadata.artist = _get_tag_value(tags, "Author") + metadata.album = _get_tag_value(tags, "WM/AlbumTitle") + metadata.year = _get_tag_value(tags, "WM/Year") + metadata.genre = _get_tag_value(tags, "WM/Genre") + if "WM/Picture" in tags: + metadata.picture = _save_picture( + tags["WM/Picture"][0].value, save_root, file_path + ) -def list2str(li, verbose=False): - if len(li) > 5 and not verbose: - return f"{li[:2]} ... {li[-2:]} with len: {len(li)}" - else: - return f"{li}" + elif isinstance(audio, WavPack): + metadata.title = _get_tag_value(tags, "Title") + metadata.artist = _get_tag_value(tags, "Artist") + metadata.album = _get_tag_value(tags, "Album") + metadata.year = _get_tag_value(tags, "Year") + metadata.genre = _get_tag_value(tags, "Genre") + if audio.pictures: + metadata.picture = _save_picture( + audio.pictures[0].data, save_root, file_path + ) + elif isinstance(audio, WAVE): + metadata.title = _get_tag_value(tags, "Title") + metadata.artist = _get_tag_value(tags, "Artist") -async def get_latest_version(package_name: str) -> str: - url = f"https://pypi.org/pypi/{package_name}/json" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - return data["info"]["version"] - else: - return None + return asdict(metadata) diff --git a/xiaomusic/xiaomusic.py b/xiaomusic/xiaomusic.py index 0e2f88320..ee162cc19 100644 --- a/xiaomusic/xiaomusic.py +++ b/xiaomusic/xiaomusic.py @@ -41,9 +41,9 @@ convert_file_to_mp3, custom_sort_key, deepcopy_data_no_sensitive_info, + extract_audio_metadata, find_best_match, fuzzyfinder, - get_audio_metadata, get_local_music_duration, get_web_music_duration, is_mp3, @@ -399,7 +399,19 @@ async def get_music_sec_url(self, name): return sec, url def get_music_tags(self, name): - return self.all_music_tags.get(name, asdict(Metadata())) + tags = copy.copy(self.all_music_tags.get(name, asdict(Metadata()))) + picture = tags["picture"] + if picture: + picture = picture.replace("\\", "/") + if picture.startswith(self.config.picture_cache_path): + picture = picture[len(self.config.picture_cache_path) :] + if picture.startswith("/"): + picture = picture[1:] + encoded_name = urllib.parse.quote(picture) + tags["picture"] = ( + f"{self.hostname}:{self.public_port}/picture/{encoded_name}" + ) + return tags def get_music_url(self, name): if self.is_web_music(name): @@ -454,6 +466,8 @@ def refresh_music_tag(self): else: self.log.info("刷新:tag cache 未启用") # TODO: 优化性能? + # TODO 如何安全的清空 picture_cache_path + self.all_music_tags = {} # 需要清空内存残留 self.try_gen_all_music_tag() self.log.info("刷新:已启动重建 tag cache") @@ -512,11 +526,13 @@ async def _gen_all_music_tag(self, only_items: dict = None): # TODO: 网络歌曲获取歌曲额外信息 pass elif os.path.exists(file_or_url): - all_music_tags[name] = get_audio_metadata(file_or_url) + all_music_tags[name] = extract_audio_metadata( + file_or_url, self.config.picture_cache_path + ) else: self.log.info(f"{name}/{file_or_url} 无法更新 tag") except BaseException as e: - self.log.info(f"{e} {file_or_url} error {type(file_or_url)}!") + self.log.exception(f"{e} {file_or_url} error {type(file_or_url)}!") # 全部更新结束后,一次性赋值 self.all_music_tags = all_music_tags # 刷新 tag cache