Skip to content

Commit

Permalink
feat: 歌曲信息中的图片改为url #190
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxi committed Sep 24, 2024
1 parent d214cc8 commit baf9a83
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 154 deletions.
15 changes: 6 additions & 9 deletions test/test_music_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
SUPPORT_MUSIC_TYPE,
)
from xiaomusic.utils import (
get_audio_metadata,
extract_audio_metadata,
traverse_music_directory,
)

Expand All @@ -20,12 +20,8 @@
async def test_one_music(filename):
# 获取播放时长
try:
metadata = get_audio_metadata(filename)
print(metadata.title, metadata.album)
if metadata:
lyrics = metadata.lyrics
if lyrics:
print(f"歌曲 : {filename}{lyrics}")
metadata = extract_audio_metadata(filename, "cache/picture_cache")
print(metadata)
except Exception as e:
print(f"歌曲 : {filename} no tag {e}")
traceback.print_exc()
Expand All @@ -34,13 +30,14 @@ async def test_one_music(filename):
async def main(directory):
# 获取所有歌曲文件
local_musics = traverse_music_directory(directory, 10, [], SUPPORT_MUSIC_TYPE)
print(local_musics)
for _, files in local_musics.items():
for file in files:
await test_one_music(file)
print(file)
# await test_one_music(file)
pass

await test_one_music("./music/一生何求.mp3")
await test_one_music("./music/程响-人间烟火.flac")


if __name__ == "__main__":
Expand Down
7 changes: 7 additions & 0 deletions xiaomusic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,10 @@ def tag_cache_path(self):
os.makedirs(self.cache_dir)
filename = os.path.join(self.cache_dir, "tag_cache.json")
return filename

@property
def picture_cache_path(self):
cache_path = os.path.join(self.cache_dir, "picture_cache")
if not os.path.exists(cache_path):
os.makedirs(cache_path)
return cache_path
15 changes: 15 additions & 0 deletions xiaomusic/httpserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,18 @@ async def music_options():
"Accept-Ranges": "bytes",
}
return Response(headers=headers)


@app.get("/picture/{file_path:path}")
async def get_picture(request: Request, file_path: str):
absolute_path = os.path.abspath(config.picture_cache_path)
absolute_file_path = os.path.normpath(os.path.join(absolute_path, file_path))
if not absolute_file_path.startswith(absolute_path):
raise HTTPException(status_code=404, detail="File not found")
if not os.path.exists(absolute_file_path):
raise HTTPException(status_code=404, detail="File not found")

mime_type, _ = mimetypes.guess_type(absolute_file_path)
if mime_type is None:
mime_type = "image/jpeg"
return FileResponse(absolute_file_path, media_type=mime_type)
283 changes: 142 additions & 141 deletions xiaomusic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import copy
import difflib
import hashlib
import json
import logging
import mimetypes
Expand All @@ -22,13 +23,14 @@

import aiohttp
import mutagen
from mutagen.asf import ASF
from mutagen.flac import FLAC
from mutagen.id3 import APIC, ID3
from mutagen.monkeysaudio import MonkeysAudio
from mutagen.id3 import ID3, Encoding, TextFrame, TimeStampTextFrame
from mutagen.mp3 import MP3
from mutagen.mp4 import MP4
from mutagen.oggvorbis import OggVorbis
from mutagen.wave import WAVE
from mutagen.wavpack import WavPack
from opencc import OpenCC
from requests.utils import cookiejar_from_dict

Expand Down Expand Up @@ -492,21 +494,22 @@ def chinese_to_number(chinese):
return result


def get_audio_metadata(file_path):
ret = Metadata()
if file_path.endswith(".mp3"):
ret = get_mp3_metadata(file_path)
elif file_path.endswith(".flac"):
ret = get_flac_metadata(file_path)
elif file_path.endswith(".wav"):
ret = get_wav_metadata(file_path)
elif file_path.endswith(".ape"):
ret = get_ape_metadata(file_path)
elif file_path.endswith(".ogg"):
ret = get_ogg_metadata(file_path)
elif file_path.endswith(".m4a"):
ret = get_m4a_metadata(file_path)
return {k: str(v) for k, v in asdict(ret).items()}
def list2str(li, verbose=False):
if len(li) > 5 and not verbose:
return f"{li[:2]} ... {li[-2:]} with len: {len(li)}"
else:
return f"{li}"


async def get_latest_version(package_name: str) -> str:
url = f"https://pypi.org/pypi/{package_name}/json"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data["info"]["version"]
else:
return None


@dataclass
Expand All @@ -520,134 +523,132 @@ class Metadata:
lyrics: str = ""


def get_mp3_metadata(file_path):
audio = MP3(file_path, ID3=ID3)
tags = audio.tags
if tags is None:
return Metadata()

# 处理编码
def get_tag_value(tags, k):
if k not in tags:
return ""
v = tags[k]
if isinstance(v, mutagen.id3.TextFrame) and not isinstance(
v, mutagen.id3.TimeStampTextFrame
):
old_ts = "".join(v.text)
if v.encoding == mutagen.id3.Encoding.LATIN1:
bs = old_ts.encode("latin1")
ts = bs.decode("GBK", errors="ignore")
return ts
return old_ts
return v

metadata = Metadata(
title=get_tag_value(tags, "TIT2"),
artist=get_tag_value(tags, "TPE1"),
album=get_tag_value(tags, "TALB"),
year=get_tag_value(tags, "TDRC"),
genre=get_tag_value(tags, "TCON"),
)

for tag in tags.values():
if isinstance(tag, APIC):
metadata.picture = base64.b64encode(tag.data).decode("utf-8")
break

lyrics = tags.getall("USLT")
if lyrics:
metadata.lyrics = lyrics[0]

return metadata


def get_flac_metadata(file_path):
audio = FLAC(file_path)
metadata = Metadata(
title=audio.get("title", [""])[0],
artist=audio.get("artist", [""])[0],
album=audio.get("album", [""])[0],
year=audio.get("date", [""])[0],
genre=audio.get("genre", [""])[0],
)

if audio.pictures:
picture = audio.pictures[0]
metadata.picture = base64.b64encode(picture.data).decode("utf-8")

if "lyrics" in audio:
metadata.lyrics = audio["lyrics"][0]

return metadata


def get_wav_metadata(file_path):
audio = WAVE(file_path)
metadata = Metadata(
title=audio.get("TIT2", [""])[0],
artist=audio.get("TPE1", [""])[0],
album=audio.get("TALB", [""])[0],
year=audio.get("TDRC", [""])[0],
genre=audio.get("TCON", [""])[0],
)
return metadata


def get_ape_metadata(file_path):
audio = MonkeysAudio(file_path)
metadata = Metadata(
title=audio.get("TIT2", [""])[0],
artist=audio.get("TPE1", [""])[0],
album=audio.get("TALB", [""])[0],
year=audio.get("TDRC", [""])[0],
genre=audio.get("TCON", [""])[0],
)
return metadata


def get_ogg_metadata(file_path):
audio = OggVorbis(file_path)
metadata = Metadata(
title=audio.get("title", [""])[0],
artist=audio.get("artist", [""])[0],
album=audio.get("album", [""])[0],
year=audio.get("date", [""])[0],
genre=audio.get("genre", [""])[0],
)
return metadata

def _get_alltag_value(tags, k):
v = tags.getall(k)
if len(v) > 0:
return _to_utf8(v[0])
return ""


def _get_tag_value(tags, k):
if k not in tags:
return ""
v = tags[k]
return _to_utf8(v)


def _to_utf8(v):
if isinstance(v, TextFrame) and not isinstance(v, TimeStampTextFrame):
old_ts = "".join(v.text)
if v.encoding == Encoding.LATIN1:
bs = old_ts.encode("latin1")
ts = bs.decode("GBK", errors="ignore")
return ts
return old_ts
elif isinstance(v, list):
return "".join(v)
return str(v)


def _save_picture(picture_data, save_root, file_path):
# 计算文件名的哈希值
file_hash = hashlib.md5(file_path.encode("utf-8")).hexdigest()
# 创建目录结构
dir_path = os.path.join(save_root, file_hash[-6:])
os.makedirs(dir_path, exist_ok=True)

# 检测图片格式
if picture_data[:3] == b"\xff\xd8\xff":
ext = "jpg"
elif picture_data[:8] == b"\x89PNG\r\n\x1a\n":
ext = "png"
else:
ext = "bin" # 未知格式

def get_m4a_metadata(file_path):
audio = MP4(file_path)
metadata = Metadata(
title=audio.tags.get("\xa9nam", [""])[0],
artist=audio.tags.get("\xa9ART", [""])[0],
album=audio.tags.get("\xa9alb", [""])[0],
year=audio.tags.get("\xa9day", [""])[0],
genre=audio.tags.get("\xa9gen", [""])[0],
)
# 保存图片
filename = os.path.basename(file_path)
(name, _) = os.path.splitext(filename)
picture_path = os.path.join(dir_path, f"{name}.{ext}")
with open(picture_path, "wb") as img:
img.write(picture_data)
return picture_path

if "covr" in audio.tags:
cover = audio.tags["covr"][0]
metadata.picture = base64.b64encode(cover).decode("utf-8")

return metadata
def extract_audio_metadata(file_path, save_root):
audio = mutagen.File(file_path)
metadata = Metadata()
tags = audio.tags
if tags is None:
return asdict(metadata)

if isinstance(audio, MP3):
metadata.title = _get_tag_value(tags, "TIT2")
metadata.artist = _get_tag_value(tags, "TPE1")
metadata.album = _get_tag_value(tags, "TALB")
metadata.year = _get_tag_value(tags, "TDRC")
metadata.genre = _get_tag_value(tags, "TCON")
if "APIC:" in tags:
metadata.picture = _save_picture(tags["APIC:"].data, save_root, file_path)
metadata.lyrics = _get_alltag_value(tags, "USLT")

elif isinstance(audio, FLAC):
metadata.title = _get_tag_value(tags, "TITLE")
metadata.artist = _get_tag_value(tags, "ARTIST")
metadata.album = _get_tag_value(tags, "ALBUM")
metadata.year = _get_tag_value(tags, "DATE")
metadata.genre = _get_tag_value(tags, "GENRE")
if audio.pictures:
metadata.picture = _save_picture(
audio.pictures[0].data, save_root, file_path
)
if "lyrics" in audio:
metadata.lyrics = audio["lyrics"][0]

elif isinstance(audio, MP4):
metadata.title = _get_tag_value(tags, "\xa9nam")
metadata.artist = _get_tag_value(tags, "\xa9ART")
metadata.album = _get_tag_value(tags, "\xa9alb")
metadata.year = _get_tag_value(tags, "\xa9day")
metadata.genre = _get_tag_value(tags, "\xa9gen")
if "covr" in tags:
metadata.picture = _save_picture(tags["covr"][0], save_root, file_path)

elif isinstance(audio, OggVorbis):
metadata.title = _get_tag_value(tags, "TITLE")
metadata.artist = _get_tag_value(tags, "ARTIST")
metadata.album = _get_tag_value(tags, "ALBUM")
metadata.year = _get_tag_value(tags, "DATE")
metadata.genre = _get_tag_value(tags, "GENRE")
if "metadata_block_picture" in tags:
picture = json.loads(base64.b64decode(tags["metadata_block_picture"][0]))
metadata.picture = _save_picture(
base64.b64decode(picture["data"]), save_root, file_path
)

elif isinstance(audio, ASF):
metadata.title = _get_tag_value(tags, "Title")
metadata.artist = _get_tag_value(tags, "Author")
metadata.album = _get_tag_value(tags, "WM/AlbumTitle")
metadata.year = _get_tag_value(tags, "WM/Year")
metadata.genre = _get_tag_value(tags, "WM/Genre")
if "WM/Picture" in tags:
metadata.picture = _save_picture(
tags["WM/Picture"][0].value, save_root, file_path
)

def list2str(li, verbose=False):
if len(li) > 5 and not verbose:
return f"{li[:2]} ... {li[-2:]} with len: {len(li)}"
else:
return f"{li}"
elif isinstance(audio, WavPack):
metadata.title = _get_tag_value(tags, "Title")
metadata.artist = _get_tag_value(tags, "Artist")
metadata.album = _get_tag_value(tags, "Album")
metadata.year = _get_tag_value(tags, "Year")
metadata.genre = _get_tag_value(tags, "Genre")
if audio.pictures:
metadata.picture = _save_picture(
audio.pictures[0].data, save_root, file_path
)

elif isinstance(audio, WAVE):
metadata.title = _get_tag_value(tags, "Title")
metadata.artist = _get_tag_value(tags, "Artist")

async def get_latest_version(package_name: str) -> str:
url = f"https://pypi.org/pypi/{package_name}/json"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data["info"]["version"]
else:
return None
return asdict(metadata)
Loading

0 comments on commit baf9a83

Please sign in to comment.