223 lines
7.3 KiB
Python
223 lines
7.3 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
从 /data/guitar/ 把旧 guitar app 的曲库导入到 music API。
|
|||
|
|
|
|||
|
|
跑法(在 oci 上,源数据原地):
|
|||
|
|
python3 import_guitar.py --src /data/guitar --api https://music.famzheng.me
|
|||
|
|
|
|||
|
|
每个 mp3 = 一个 piece:
|
|||
|
|
- title / artist 从文件名 "{title} - {artist}.mp3" 拆
|
|||
|
|
- lyrics 来自同名 .lrc(UTF-8)
|
|||
|
|
- chord png 来自 chords/{sanitize(artist)}-{sanitize(title)}.png(如有)
|
|||
|
|
- play_count 来自 playcounts.json[mp3 文件名]
|
|||
|
|
- category 设为 "流行"(旧 guitar 里都是流行/吉他歌)
|
|||
|
|
|
|||
|
|
幂等:以 (title, artist) 作主键去重,已有 piece 跳过附件重传。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
import sys
|
|||
|
|
import urllib.request
|
|||
|
|
import urllib.parse
|
|||
|
|
import urllib.error
|
|||
|
|
import os
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
CATEGORY = "流行"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def sanitize(text: str) -> str:
|
|||
|
|
"""匹配旧 chord_server.py 的 sanitize:替换非法文件名字符为 -"""
|
|||
|
|
return re.sub(r'[<>:"/\\|?*]', '-', text).strip()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_filename(stem: str) -> tuple[str, str]:
|
|||
|
|
"""'{title} - {artist}' → (title, artist)"""
|
|||
|
|
parts = stem.split(' - ')
|
|||
|
|
title = parts[0].strip()
|
|||
|
|
artist = ' - '.join(parts[1:]).strip() if len(parts) > 1 else ''
|
|||
|
|
return title, artist
|
|||
|
|
|
|||
|
|
|
|||
|
|
def http_request(method: str, url: str, *, json_body=None, file_field=None,
|
|||
|
|
timeout: int = 600) -> dict:
|
|||
|
|
"""简单 HTTP 客户端,return JSON dict。失败抛异常。"""
|
|||
|
|
headers = {}
|
|||
|
|
data = None
|
|||
|
|
if json_body is not None:
|
|||
|
|
data = json.dumps(json_body).encode('utf-8')
|
|||
|
|
headers['Content-Type'] = 'application/json'
|
|||
|
|
elif file_field is not None:
|
|||
|
|
boundary = '----music-import-' + os.urandom(8).hex()
|
|||
|
|
body = []
|
|||
|
|
for field_name, filename, content, mime in file_field:
|
|||
|
|
body.append(f'--{boundary}\r\n'.encode())
|
|||
|
|
disp = (f'Content-Disposition: form-data; name="{field_name}"; '
|
|||
|
|
f'filename="{filename}"\r\n').encode()
|
|||
|
|
body.append(disp)
|
|||
|
|
body.append(f'Content-Type: {mime}\r\n\r\n'.encode())
|
|||
|
|
body.append(content)
|
|||
|
|
body.append(b'\r\n')
|
|||
|
|
body.append(f'--{boundary}--\r\n'.encode())
|
|||
|
|
data = b''.join(body)
|
|||
|
|
headers['Content-Type'] = f'multipart/form-data; boundary={boundary}'
|
|||
|
|
req = urllib.request.Request(url, data=data, method=method, headers=headers)
|
|||
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|||
|
|
return json.loads(resp.read().decode('utf-8'))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def list_pieces(api: str) -> list:
|
|||
|
|
return http_request('GET', f'{api}/api/pieces')
|
|||
|
|
|
|||
|
|
|
|||
|
|
def create_piece(api: str, title: str, artist: str, lyrics: str | None) -> int:
|
|||
|
|
body = {
|
|||
|
|
'title': title,
|
|||
|
|
'artist': artist or None,
|
|||
|
|
'category': CATEGORY,
|
|||
|
|
'lyrics': lyrics or None,
|
|||
|
|
}
|
|||
|
|
r = http_request('POST', f'{api}/api/pieces', json_body=body)
|
|||
|
|
return int(r['id'])
|
|||
|
|
|
|||
|
|
|
|||
|
|
def patch_play_count(api: str, piece_id: int, count: int):
|
|||
|
|
http_request('PATCH', f'{api}/api/pieces/{piece_id}',
|
|||
|
|
json_body={'play_count': count})
|
|||
|
|
|
|||
|
|
|
|||
|
|
def upload_attachment(api: str, piece_id: int, path: Path,
|
|||
|
|
mime: str, role: str | None = None) -> dict:
|
|||
|
|
url = f'{api}/api/pieces/{piece_id}/attachments'
|
|||
|
|
if role:
|
|||
|
|
url += '?role=' + urllib.parse.quote(role)
|
|||
|
|
bytes_content = path.read_bytes()
|
|||
|
|
return http_request('POST', url, file_field=[
|
|||
|
|
('files', path.name, bytes_content, mime),
|
|||
|
|
])
|
|||
|
|
|
|||
|
|
|
|||
|
|
def detect_audio_mime(path: Path) -> str:
|
|||
|
|
ext = path.suffix.lower()
|
|||
|
|
return {
|
|||
|
|
'.mp3': 'audio/mpeg',
|
|||
|
|
'.m4a': 'audio/mp4',
|
|||
|
|
'.flac': 'audio/flac',
|
|||
|
|
'.ogg': 'audio/ogg',
|
|||
|
|
'.wav': 'audio/wav',
|
|||
|
|
}.get(ext, 'application/octet-stream')
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
ap = argparse.ArgumentParser()
|
|||
|
|
ap.add_argument('--src', default='/data/guitar', help='guitar 数据根目录')
|
|||
|
|
ap.add_argument('--api', required=True, help='music API base,e.g. https://music.famzheng.me')
|
|||
|
|
ap.add_argument('--dry-run', action='store_true', help='只打印不实际调用')
|
|||
|
|
ap.add_argument('--limit', type=int, default=0, help='测试用,最多导入 N 首(0=全部)')
|
|||
|
|
args = ap.parse_args()
|
|||
|
|
|
|||
|
|
src = Path(args.src)
|
|||
|
|
if not src.exists():
|
|||
|
|
print(f'[!] {src} 不存在', file=sys.stderr)
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
chords_dir = src / 'chords'
|
|||
|
|
playcounts_file = src / 'playcounts.json'
|
|||
|
|
playcounts = {}
|
|||
|
|
if playcounts_file.exists():
|
|||
|
|
try:
|
|||
|
|
playcounts = json.loads(playcounts_file.read_text())
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f'[!] 读 playcounts.json 失败: {e}', file=sys.stderr)
|
|||
|
|
|
|||
|
|
# 已有 pieces 去重
|
|||
|
|
existing = {}
|
|||
|
|
if not args.dry_run:
|
|||
|
|
try:
|
|||
|
|
for p in list_pieces(args.api):
|
|||
|
|
key = (p['title'], p.get('artist') or '')
|
|||
|
|
existing[key] = p['id']
|
|||
|
|
print(f'[i] 远端已有 {len(existing)} 首曲目,重复的会跳过创建')
|
|||
|
|
except urllib.error.URLError as e:
|
|||
|
|
print(f'[!] 连不上 {args.api}: {e}', file=sys.stderr)
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
mp3s = sorted(src.glob('*.mp3'))
|
|||
|
|
if args.limit:
|
|||
|
|
mp3s = mp3s[:args.limit]
|
|||
|
|
|
|||
|
|
total = len(mp3s)
|
|||
|
|
ok = skipped = failed = 0
|
|||
|
|
|
|||
|
|
for i, mp3 in enumerate(mp3s, 1):
|
|||
|
|
stem = mp3.stem
|
|||
|
|
title, artist = parse_filename(stem)
|
|||
|
|
if not title:
|
|||
|
|
print(f'[!] [{i}/{total}] 无法解析文件名: {mp3.name}', file=sys.stderr)
|
|||
|
|
failed += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
key = (title, artist)
|
|||
|
|
prefix = f'[{i}/{total}] {title} - {artist}'
|
|||
|
|
|
|||
|
|
if key in existing:
|
|||
|
|
print(f' ↻ {prefix} (已存在 id={existing[key]}, 跳过)')
|
|||
|
|
skipped += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# lrc
|
|||
|
|
lrc_path = mp3.with_suffix('.lrc')
|
|||
|
|
lyrics = None
|
|||
|
|
if lrc_path.exists():
|
|||
|
|
try:
|
|||
|
|
lyrics = lrc_path.read_text(encoding='utf-8')
|
|||
|
|
except UnicodeDecodeError:
|
|||
|
|
lyrics = lrc_path.read_text(encoding='gbk', errors='replace')
|
|||
|
|
|
|||
|
|
# chord png
|
|||
|
|
chord_path = chords_dir / f'{sanitize(artist)}-{sanitize(title)}.png'
|
|||
|
|
if not chord_path.exists():
|
|||
|
|
chord_path = None
|
|||
|
|
|
|||
|
|
play_count = int(playcounts.get(mp3.name, 0))
|
|||
|
|
|
|||
|
|
if args.dry_run:
|
|||
|
|
print(f' + {prefix}'
|
|||
|
|
f'{" [词]" if lyrics else ""}'
|
|||
|
|
f'{" [谱]" if chord_path else ""}'
|
|||
|
|
f'{f" [{play_count}次]" if play_count else ""}')
|
|||
|
|
ok += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
piece_id = create_piece(args.api, title, artist, lyrics)
|
|||
|
|
print(f' + {prefix} → id={piece_id}', end='', flush=True)
|
|||
|
|
|
|||
|
|
upload_attachment(args.api, piece_id, mp3,
|
|||
|
|
detect_audio_mime(mp3))
|
|||
|
|
print(' [audio]', end='', flush=True)
|
|||
|
|
|
|||
|
|
if chord_path:
|
|||
|
|
upload_attachment(args.api, piece_id, chord_path,
|
|||
|
|
'image/png', role='chord')
|
|||
|
|
print(' [chord]', end='', flush=True)
|
|||
|
|
|
|||
|
|
if play_count:
|
|||
|
|
patch_play_count(args.api, piece_id, play_count)
|
|||
|
|
print(f' [{play_count}次]', end='', flush=True)
|
|||
|
|
|
|||
|
|
print()
|
|||
|
|
ok += 1
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f'\n[!] {prefix} 失败: {e}', file=sys.stderr)
|
|||
|
|
failed += 1
|
|||
|
|
|
|||
|
|
print()
|
|||
|
|
print(f'完成: ok={ok} skipped={skipped} failed={failed} total={total}')
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|