Files
cube/apps/music/src/main.rs
T

1497 lines
49 KiB
Rust
Raw Normal View History

//! music.famzheng.me — 听歌 + 练琴。
//!
//! 数据模型:曲目 (piece) → 附件 (attachment, 类型 video/audio/pdf/image)。
//! 元数据走 sqlite,附件 bytes 落 `/data/blobs/<id>`Range 下载交给 tower-http ServeFile。
//!
//! API
//! - `GET /api/pieces` 列表(含附件计数 + 简要类型分布)
//! - `POST /api/pieces` 创建(json: title, category?, notes?
//! - `GET /api/pieces/:id` 详情(含 attachments 列表)
//! - `PATCH /api/pieces/:id` 改 title / category / notes
//! - `DELETE /api/pieces/:id` 删曲目 + 级联删附件 + 同步删磁盘
//! - `POST /api/pieces/:id/attachments` multipart 流式上传,可一次多文件
//! - `GET /api/attachments/:id` 下载(带 Rangevideo/audio 拖动用)
//! - `DELETE /api/attachments/:id` 删单个附件 + 磁盘文件
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use axum::{
body::Body,
extract::{DefaultBodyLimit, Multipart, Path, Query, Request, State},
http::{header, StatusCode},
response::{
sse::{Event, Sse},
IntoResponse, Json as JsonResp, Response,
},
routing::{delete, get, post},
Router,
};
use futures::Stream;
use std::convert::Infallible;
use rusqlite::{params, Connection, OptionalExtension};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::AsyncWriteExt;
use tower::ServiceExt;
const SINGLE_FILE_BYTES: usize = 1024 * 1024 * 1024; // 1 GiB / 单附件
const REQUEST_BYTES: usize = 5 * 1024 * 1024 * 1024; // 5 GiB / 单次上传
#[derive(Clone)]
struct AppState {
db: Arc<Mutex<Connection>>,
blobs_dir: PathBuf,
/// 同 pod 的 chord-fetcher sidecar root(默认 http://localhost:8001)。
chord_url: String,
http: reqwest::Client,
/// LLM 网关(OpenAI 兼容 /v1)—— 同 mochi/config.yaml。
chat_gateway: String,
chat_token: String,
chat_model: String,
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
cube_core::init_tracing();
let db_path = std::env::var("DB_PATH").unwrap_or_else(|_| "/data/app.db".into());
let blobs_dir =
PathBuf::from(std::env::var("BLOBS_DIR").unwrap_or_else(|_| "/data/blobs".into()));
let dist = std::env::var("MUSIC_DIST_DIR").unwrap_or_else(|_| "/dist".into());
std::fs::create_dir_all(&blobs_dir).expect("mkdir blobs_dir");
let conn = Connection::open(&db_path).expect("open sqlite");
conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=ON;
CREATE TABLE IF NOT EXISTS pieces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
artist TEXT,
category TEXT,
notes TEXT,
lyrics TEXT,
play_count INTEGER NOT NULL DEFAULT 0,
last_played_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS attachments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
piece_id INTEGER NOT NULL,
kind TEXT NOT NULL,
role TEXT,
mime TEXT NOT NULL,
filename TEXT NOT NULL,
size_bytes INTEGER NOT NULL DEFAULT 0,
sort_order INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_att_piece ON attachments(piece_id);
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
piece_id INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_chat_piece ON chat_messages(piece_id);
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS piece_tags (
piece_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (piece_id, tag_id),
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_pt_tag ON piece_tags(tag_id);
CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS playlist_pieces (
playlist_id INTEGER NOT NULL,
piece_id INTEGER NOT NULL,
sort_order INTEGER NOT NULL DEFAULT 0,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (playlist_id, piece_id),
FOREIGN KEY (playlist_id) REFERENCES playlists(id) ON DELETE CASCADE,
FOREIGN KEY (piece_id) REFERENCES pieces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_pp_piece ON playlist_pieces(piece_id);",
)
.expect("init schema");
tracing::info!(%db_path, blobs = %blobs_dir.display(), "music ready");
let chord_url =
std::env::var("CHORD_URL").unwrap_or_else(|_| "http://localhost:8001".into());
let chat_gateway =
std::env::var("CHAT_GATEWAY").unwrap_or_else(|_| "http://3.135.65.204:8848/v1".into());
let chat_token = std::env::var("CHAT_TOKEN").unwrap_or_default();
let chat_model =
std::env::var("CHAT_MODEL").unwrap_or_else(|_| "gemma-4-31b-it".into());
// 关键:reqwest 默认 timeout 不要给 chat 用 —— chat stream 必须能跑很久。
// 对 chord sidecar 的小请求另外用 .timeout() per-request。
let http = reqwest::Client::builder()
.build()
.expect("build reqwest client");
let state = AppState {
db: Arc::new(Mutex::new(conn)),
blobs_dir,
chord_url,
http,
chat_gateway,
chat_token,
chat_model,
};
let api = Router::new()
.route("/pieces", get(list_pieces).post(create_piece))
.route(
"/pieces/:id",
get(get_piece).patch(patch_piece).delete(delete_piece),
)
.route("/pieces/:id/play", post(record_play))
.route("/pieces/:id/chord/fetch", post(chord_fetch))
.route("/pieces/:id/chord/status", get(chord_status))
.route(
"/pieces/:id/chat",
get(list_chat).post(post_chat).delete(clear_chat),
)
.route(
"/pieces/:id/attachments",
post(upload_attachments).layer(DefaultBodyLimit::max(REQUEST_BYTES)),
)
.route(
"/attachments/:id",
get(get_attachment).delete(delete_attachment),
)
.route("/tags", get(list_tags).post(create_tag))
.route("/tags/:id", delete(delete_tag))
.route("/playlists", get(list_playlists).post(create_playlist))
.route(
"/playlists/:id",
get(get_playlist).patch(patch_playlist).delete(delete_playlist),
)
.route("/playlists/:id/pieces", post(playlist_add_piece))
.route(
"/playlists/:id/pieces/:piece_id",
delete(playlist_remove_piece),
)
.with_state(state);
let app = cube_core::base(dist).nest("/api", api);
cube_core::serve(app, 8080).await
}
// ---------- 类型 ----------
#[derive(Serialize)]
struct PieceSummary {
id: i64,
title: String,
artist: Option<String>,
category: Option<String>,
play_count: i64,
last_played_at: Option<String>,
attachments: i64,
kinds: Vec<String>,
tags: Vec<String>,
has_lyrics: bool,
created_at: String,
}
#[derive(Serialize)]
struct PieceDetail {
id: i64,
title: String,
artist: Option<String>,
category: Option<String>,
notes: Option<String>,
lyrics: Option<String>,
play_count: i64,
last_played_at: Option<String>,
created_at: String,
attachments: Vec<Attachment>,
tags: Vec<String>,
}
#[derive(Serialize)]
struct Attachment {
id: i64,
kind: String,
role: Option<String>,
mime: String,
filename: String,
size_bytes: i64,
sort_order: i64,
created_at: String,
}
#[derive(Deserialize)]
struct UploadQuery {
role: Option<String>,
}
#[derive(Deserialize)]
struct CreatePiece {
title: String,
artist: Option<String>,
category: Option<String>,
notes: Option<String>,
lyrics: Option<String>,
}
#[derive(Deserialize)]
struct PatchPiece {
title: Option<String>,
artist: Option<Option<String>>,
category: Option<Option<String>>,
notes: Option<Option<String>>,
lyrics: Option<Option<String>>,
/// 整体 replace;空数组等于清空
tags: Option<Vec<String>>,
/// admin / import 用:直接写 play_countmvp 无认证)
play_count: Option<i64>,
}
#[derive(Deserialize, Default)]
struct ListPiecesQuery {
tag: Option<String>,
playlist: Option<i64>,
}
// ---------- handlers: pieces ----------
async fn list_pieces(
State(s): State<AppState>,
Query(q): Query<ListPiecesQuery>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
// 构造可选 filter 条件。每行 join 一次拿到 piecesubquery 单独算 attachments / tags
let (filter_join, filter_where, bind): (&str, &str, Vec<rusqlite::types::Value>) =
if let Some(t) = q.tag.as_deref().filter(|s| !s.is_empty()) {
(
"JOIN piece_tags pt ON pt.piece_id = p.id JOIN tags ft ON ft.id = pt.tag_id",
"WHERE ft.name = ?1",
vec![t.to_string().into()],
)
} else if let Some(pid) = q.playlist {
(
"JOIN playlist_pieces pp ON pp.piece_id = p.id",
"WHERE pp.playlist_id = ?1",
vec![pid.into()],
)
} else {
("", "", vec![])
};
let sql = format!(
"SELECT p.id, p.title, p.artist, p.category,
p.play_count, p.last_played_at, p.created_at,
(SELECT COUNT(*) FROM attachments a WHERE a.piece_id = p.id) AS att_count,
COALESCE((SELECT GROUP_CONCAT(DISTINCT a.kind)
FROM attachments a WHERE a.piece_id = p.id), '') AS kinds,
CASE WHEN p.lyrics IS NOT NULL AND length(p.lyrics) > 0 THEN 1 ELSE 0 END AS has_lyrics,
COALESCE((SELECT GROUP_CONCAT(t.name, char(9))
FROM piece_tags pt2 JOIN tags t ON t.id = pt2.tag_id
WHERE pt2.piece_id = p.id), '') AS tags
FROM pieces p
{filter_join}
{filter_where}
GROUP BY p.id
ORDER BY p.title COLLATE NOCASE ASC, p.id ASC"
);
let mut stmt = conn.prepare(&sql)?;
let bind_refs: Vec<&dyn rusqlite::ToSql> = bind.iter().map(|v| v as &dyn rusqlite::ToSql).collect();
let rows = stmt
.query_map(bind_refs.as_slice(), |r| {
let kinds_csv: String = r.get(8)?;
let kinds = if kinds_csv.is_empty() {
Vec::new()
} else {
kinds_csv.split(',').map(|x| x.to_string()).collect()
};
let tags_raw: String = r.get(10)?;
let tags = if tags_raw.is_empty() {
Vec::new()
} else {
tags_raw.split('\t').map(|x| x.to_string()).collect()
};
let has_lyrics: i64 = r.get(9)?;
Ok(PieceSummary {
id: r.get(0)?,
title: r.get(1)?,
artist: r.get(2)?,
category: r.get(3)?,
play_count: r.get(4)?,
last_played_at: r.get(5)?,
created_at: r.get(6)?,
attachments: r.get(7)?,
kinds,
tags,
has_lyrics: has_lyrics != 0,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
async fn create_piece(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreatePiece>,
) -> Result<JsonResp<Value>, AppError> {
let title = body.title.trim();
if title.is_empty() {
return Err(AppError::bad_request("title required"));
}
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO pieces (title, artist, category, notes, lyrics)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
title,
body.artist.as_deref().map(str::trim).filter(|s| !s.is_empty()),
body.category.as_deref().map(str::trim).filter(|s| !s.is_empty()),
body.notes.as_deref(),
body.lyrics.as_deref()
],
)?;
let id = conn.last_insert_rowid();
Ok(JsonResp(json!({ "id": id })))
}
async fn get_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<PieceDetail>, AppError> {
let conn = s.db.lock().unwrap();
type PieceRow = (
String,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
i64,
Option<String>,
String,
);
let row: Option<PieceRow> = conn
.query_row(
"SELECT title, artist, category, notes, lyrics, play_count, last_played_at, created_at
FROM pieces WHERE id = ?1",
params![id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
r.get(6)?,
r.get(7)?,
))
},
)
.optional()?;
let (title, artist, category, notes, lyrics, play_count, last_played_at, created_at) =
row.ok_or(AppError::NotFound)?;
let mut stmt = conn.prepare(
"SELECT id, kind, role, mime, filename, size_bytes, sort_order, created_at
FROM attachments
WHERE piece_id = ?1
ORDER BY sort_order ASC, id ASC",
)?;
let attachments = stmt
.query_map(params![id], |r| {
Ok(Attachment {
id: r.get(0)?,
kind: r.get(1)?,
role: r.get(2)?,
mime: r.get(3)?,
filename: r.get(4)?,
size_bytes: r.get(5)?,
sort_order: r.get(6)?,
created_at: r.get(7)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
let mut tag_stmt = conn.prepare(
"SELECT t.name FROM piece_tags pt JOIN tags t ON t.id = pt.tag_id
WHERE pt.piece_id = ?1 ORDER BY t.name COLLATE NOCASE",
)?;
let tags: Vec<String> = tag_stmt
.query_map(params![id], |r| r.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(PieceDetail {
id,
title,
artist,
category,
notes,
lyrics,
play_count,
last_played_at,
created_at,
attachments,
tags,
}))
}
async fn patch_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<PatchPiece>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row("SELECT 1 FROM pieces WHERE id = ?1", params![id], |_| {
Ok(true)
})
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
if let Some(title) = body.title.as_ref() {
let t = title.trim();
if t.is_empty() {
return Err(AppError::bad_request("title can't be blank"));
}
conn.execute("UPDATE pieces SET title = ?1 WHERE id = ?2", params![t, id])?;
}
if let Some(artist) = body.artist {
let artist = artist.as_deref().map(str::trim).filter(|s| !s.is_empty());
conn.execute(
"UPDATE pieces SET artist = ?1 WHERE id = ?2",
params![artist, id],
)?;
}
if let Some(cat) = body.category {
let cat = cat.as_deref().map(str::trim).filter(|s| !s.is_empty());
conn.execute(
"UPDATE pieces SET category = ?1 WHERE id = ?2",
params![cat, id],
)?;
}
if let Some(notes) = body.notes {
let notes = notes.as_deref();
conn.execute(
"UPDATE pieces SET notes = ?1 WHERE id = ?2",
params![notes, id],
)?;
}
if let Some(lyrics) = body.lyrics {
let lyrics = lyrics.as_deref();
conn.execute(
"UPDATE pieces SET lyrics = ?1 WHERE id = ?2",
params![lyrics, id],
)?;
}
if let Some(pc) = body.play_count {
conn.execute(
"UPDATE pieces SET play_count = ?1 WHERE id = ?2",
params![pc, id],
)?;
}
if let Some(tags) = body.tags {
conn.execute(
"DELETE FROM piece_tags WHERE piece_id = ?1",
params![id],
)?;
for name in tags {
let trimmed = name.trim();
if trimmed.is_empty() {
continue;
}
let tag_id = upsert_tag(&conn, trimmed)?;
conn.execute(
"INSERT OR IGNORE INTO piece_tags (piece_id, tag_id) VALUES (?1, ?2)",
params![id, tag_id],
)?;
}
}
Ok(JsonResp(json!({ "ok": true })))
}
async fn record_play(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute(
"UPDATE pieces
SET play_count = play_count + 1, last_played_at = CURRENT_TIMESTAMP
WHERE id = ?1",
params![id],
)?;
if n == 0 {
return Err(AppError::NotFound);
}
let count: i64 = conn.query_row(
"SELECT play_count FROM pieces WHERE id = ?1",
params![id],
|r| r.get(0),
)?;
Ok(JsonResp(json!({ "play_count": count })))
}
async fn delete_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let to_unlink: Vec<i64> = {
let conn = s.db.lock().unwrap();
let mut stmt =
conn.prepare("SELECT id FROM attachments WHERE piece_id = ?1")?;
let ids: Vec<i64> = stmt
.query_map(params![id], |r| r.get(0))?
.collect::<Result<Vec<_>, _>>()?;
let n = conn.execute("DELETE FROM pieces WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
ids
};
for aid in to_unlink {
let _ = tokio::fs::remove_file(s.blobs_dir.join(aid.to_string())).await;
}
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: chat ----------
#[derive(Serialize)]
struct ChatMessage {
id: i64,
role: String,
content: String,
created_at: String,
}
#[derive(Deserialize)]
struct PostChat {
message: String,
}
async fn list_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, role, content, created_at FROM chat_messages
WHERE piece_id = ?1 ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![piece_id], |r| {
Ok(ChatMessage {
id: r.get(0)?,
role: r.get(1)?,
content: r.get(2)?,
created_at: r.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
async fn clear_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"DELETE FROM chat_messages WHERE piece_id = ?1",
params![piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
/// `POST /api/pieces/:id/chat` — body {"message": "..."},返回 SSE 流
/// 每个 event data 是文本片段(assistant delta content)。结束时 emit 一个 `done` event。
async fn post_chat(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
JsonResp(body): JsonResp<PostChat>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let user_msg = body.message.trim().to_string();
if user_msg.is_empty() {
return Err(AppError::bad_request("message required"));
}
if s.chat_token.is_empty() {
return Err(AppError::bad_request("CHAT_TOKEN not configured"));
}
// 拼 messagessystem + history + 新 user
let (system_prompt, history) = build_chat_context(&s, piece_id)?;
let mut openai_messages: Vec<Value> = Vec::new();
if !system_prompt.is_empty() {
openai_messages.push(json!({ "role": "system", "content": system_prompt }));
}
for m in &history {
openai_messages.push(json!({ "role": m.role, "content": m.content }));
}
openai_messages.push(json!({ "role": "user", "content": user_msg }));
let payload = json!({
"model": s.chat_model,
"messages": openai_messages,
"stream": true,
});
let url = format!("{}/chat/completions", s.chat_gateway.trim_end_matches('/'));
let req = s
.http
.post(&url)
.bearer_auth(&s.chat_token)
.json(&payload);
// 先存用户消息(不等 LLM 完)
{
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO chat_messages (piece_id, role, content) VALUES (?1, 'user', ?2)",
params![piece_id, &user_msg],
)?;
}
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(64);
let state_clone = s.clone();
tokio::spawn(async move {
let mut full = String::new();
match req.send().await {
Ok(resp) if resp.status().is_success() => {
use futures::StreamExt;
let mut stream = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(b) => b,
Err(e) => {
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("stream: {e}"))))
.await;
break;
}
};
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim().to_string();
buf.drain(..=idx);
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim();
if payload.is_empty() {
continue;
}
if payload == "[DONE]" {
break;
}
match serde_json::from_str::<Value>(payload) {
Ok(v) => {
if let Some(delta) = v
.get("choices")
.and_then(|c| c.get(0))
.and_then(|c| c.get("delta"))
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
if !delta.is_empty() {
full.push_str(delta);
if tx
.send(Ok(Event::default().data(delta.to_string())))
.await
.is_err()
{
// client gone
return;
}
}
}
}
Err(e) => {
tracing::warn!(error = %e, raw = %payload, "chat: bad delta json");
}
}
}
}
}
Ok(resp) => {
let st = resp.status();
let body = resp.text().await.unwrap_or_default();
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("gateway {st}: {body}"))))
.await;
}
Err(e) => {
let _ = tx
.send(Ok(Event::default()
.event("error")
.data(format!("connect: {e}"))))
.await;
}
}
// 持久化 assistant
if !full.is_empty() {
let conn = state_clone.db.lock().unwrap();
let _ = conn.execute(
"INSERT INTO chat_messages (piece_id, role, content) VALUES (?1, 'assistant', ?2)",
params![piece_id, &full],
);
}
let _ = tx.send(Ok(Event::default().event("done").data(""))).await;
});
Ok(Sse::new(tokio_stream::wrappers::ReceiverStream::new(rx))
.keep_alive(axum::response::sse::KeepAlive::default()))
}
fn build_chat_context(
s: &AppState,
piece_id: i64,
) -> Result<(String, Vec<ChatMessage>), AppError> {
let conn = s.db.lock().unwrap();
type Row = (
String,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
);
let row: Option<Row> = conn
.query_row(
"SELECT title, artist, category, lyrics, notes FROM pieces WHERE id = ?1",
params![piece_id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
))
},
)
.optional()?;
let (title, artist, category, lyrics, notes) = row.ok_or(AppError::NotFound)?;
let mut sys = String::from(
"你是麻薯,一个懂音乐、会乐理、爱聊天的助手。用中文回答,简洁直接,必要时用 markdown。\n\n当前曲目:",
);
sys.push_str(&format!("{}", title));
if let Some(a) = artist.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("{}", a));
}
if let Some(c) = category.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("{}", c));
}
if let Some(n) = notes.as_deref().filter(|s| !s.is_empty()) {
sys.push_str(&format!("\n用户笔记:{}", n));
}
if let Some(l) = lyrics.as_deref().filter(|s| !s.is_empty()) {
// LRC 太长会爆 prompt,截到 4KB
let trimmed = if l.len() > 4096 { &l[..4096] } else { l };
sys.push_str(&format!("\n歌词(截断到 4KB):\n{}", trimmed));
}
let mut stmt = conn.prepare(
"SELECT id, role, content, created_at FROM chat_messages
WHERE piece_id = ?1 ORDER BY id ASC",
)?;
let history = stmt
.query_map(params![piece_id], |r| {
Ok(ChatMessage {
id: r.get(0)?,
role: r.get(1)?,
content: r.get(2)?,
created_at: r.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok((sys, history))
}
// ---------- handlers: tags ----------
async fn list_tags(State(s): State<AppState>) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT t.id, t.name, COUNT(pt.piece_id) AS n
FROM tags t LEFT JOIN piece_tags pt ON pt.tag_id = t.id
GROUP BY t.id ORDER BY t.name COLLATE NOCASE ASC",
)?;
let rows: Vec<Value> = stmt
.query_map([], |r| {
Ok(json!({
"id": r.get::<_, i64>(0)?,
"name": r.get::<_, String>(1)?,
"count": r.get::<_, i64>(2)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
#[derive(Deserialize)]
struct CreateTag {
name: String,
}
async fn create_tag(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreateTag>,
) -> Result<JsonResp<Value>, AppError> {
let name = body.name.trim();
if name.is_empty() {
return Err(AppError::bad_request("name required"));
}
let conn = s.db.lock().unwrap();
let id = upsert_tag(&conn, name)?;
Ok(JsonResp(json!({ "id": id, "name": name })))
}
fn upsert_tag(conn: &Connection, name: &str) -> Result<i64, rusqlite::Error> {
conn.execute(
"INSERT INTO tags (name) VALUES (?1) ON CONFLICT(name) DO NOTHING",
params![name],
)?;
conn.query_row("SELECT id FROM tags WHERE name = ?1", params![name], |r| r.get(0))
}
async fn delete_tag(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute("DELETE FROM tags WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: playlists ----------
async fn list_playlists(State(s): State<AppState>) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT p.id, p.name, p.description, p.created_at,
COUNT(pp.piece_id) AS n
FROM playlists p LEFT JOIN playlist_pieces pp ON pp.playlist_id = p.id
GROUP BY p.id ORDER BY p.created_at DESC, p.id DESC",
)?;
let rows: Vec<Value> = stmt
.query_map([], |r| {
Ok(json!({
"id": r.get::<_, i64>(0)?,
"name": r.get::<_, String>(1)?,
"description": r.get::<_, Option<String>>(2)?,
"created_at": r.get::<_, String>(3)?,
"count": r.get::<_, i64>(4)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!(rows)))
}
#[derive(Deserialize)]
struct CreatePlaylist {
name: String,
description: Option<String>,
}
async fn create_playlist(
State(s): State<AppState>,
JsonResp(body): JsonResp<CreatePlaylist>,
) -> Result<JsonResp<Value>, AppError> {
let name = body.name.trim();
if name.is_empty() {
return Err(AppError::bad_request("name required"));
}
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO playlists (name, description) VALUES (?1, ?2)",
params![name, body.description.as_deref()],
)?;
Ok(JsonResp(json!({ "id": conn.last_insert_rowid() })))
}
async fn get_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let meta: Option<(String, Option<String>, String)> = conn
.query_row(
"SELECT name, description, created_at FROM playlists WHERE id = ?1",
params![id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.optional()?;
let (name, description, created_at) = meta.ok_or(AppError::NotFound)?;
let mut stmt = conn.prepare(
"SELECT p.id, p.title, p.artist, p.category, p.play_count, p.last_played_at,
p.created_at,
(SELECT COUNT(*) FROM attachments a WHERE a.piece_id = p.id) AS att_count,
(SELECT COALESCE(GROUP_CONCAT(DISTINCT a.kind), '')
FROM attachments a WHERE a.piece_id = p.id) AS kinds,
CASE WHEN p.lyrics IS NOT NULL AND length(p.lyrics) > 0 THEN 1 ELSE 0 END,
pp.sort_order
FROM playlist_pieces pp JOIN pieces p ON p.id = pp.piece_id
WHERE pp.playlist_id = ?1
ORDER BY pp.sort_order ASC, pp.added_at ASC",
)?;
let pieces: Vec<Value> = stmt
.query_map(params![id], |r| {
let kinds_csv: String = r.get(8)?;
let kinds: Vec<&str> = if kinds_csv.is_empty() {
Vec::new()
} else {
kinds_csv.split(',').collect()
};
let has_lyrics: i64 = r.get(9)?;
Ok(json!({
"id": r.get::<_, i64>(0)?,
"title": r.get::<_, String>(1)?,
"artist": r.get::<_, Option<String>>(2)?,
"category": r.get::<_, Option<String>>(3)?,
"play_count": r.get::<_, i64>(4)?,
"last_played_at": r.get::<_, Option<String>>(5)?,
"created_at": r.get::<_, String>(6)?,
"attachments": r.get::<_, i64>(7)?,
"kinds": kinds,
"has_lyrics": has_lyrics != 0,
"sort_order": r.get::<_, i64>(10)?,
}))
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(JsonResp(json!({
"id": id,
"name": name,
"description": description,
"created_at": created_at,
"pieces": pieces,
})))
}
#[derive(Deserialize)]
struct PatchPlaylist {
name: Option<String>,
description: Option<Option<String>>,
}
async fn patch_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<PatchPlaylist>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row("SELECT 1 FROM playlists WHERE id = ?1", params![id], |_| Ok(true))
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
if let Some(n) = body.name.as_ref() {
let n = n.trim();
if n.is_empty() {
return Err(AppError::bad_request("name can't be blank"));
}
conn.execute(
"UPDATE playlists SET name = ?1 WHERE id = ?2",
params![n, id],
)?;
}
if let Some(d) = body.description {
conn.execute(
"UPDATE playlists SET description = ?1 WHERE id = ?2",
params![d.as_deref(), id],
)?;
}
Ok(JsonResp(json!({ "ok": true })))
}
async fn delete_playlist(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
let n = conn.execute("DELETE FROM playlists WHERE id = ?1", params![id])?;
if n == 0 {
return Err(AppError::NotFound);
}
Ok(JsonResp(json!({ "ok": true })))
}
#[derive(Deserialize)]
struct AddPiece {
piece_id: i64,
}
async fn playlist_add_piece(
State(s): State<AppState>,
Path(id): Path<i64>,
JsonResp(body): JsonResp<AddPiece>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO playlist_pieces (playlist_id, piece_id, sort_order)
VALUES (?1, ?2,
COALESCE((SELECT MAX(sort_order) FROM playlist_pieces WHERE playlist_id = ?1), 0) + 1)
ON CONFLICT(playlist_id, piece_id) DO NOTHING",
params![id, body.piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
async fn playlist_remove_piece(
State(s): State<AppState>,
Path((id, piece_id)): Path<(i64, i64)>,
) -> Result<JsonResp<Value>, AppError> {
let conn = s.db.lock().unwrap();
conn.execute(
"DELETE FROM playlist_pieces WHERE playlist_id = ?1 AND piece_id = ?2",
params![id, piece_id],
)?;
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- handlers: chord auto-fetch ----------
/// `POST /api/pieces/:id/chord/fetch` — 触发 sidecar 抓取 yopu 和弦谱。
/// 已经有 chord attachment 的曲目直接返回 completed。
async fn chord_fetch(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let (title, artist, has_chord) = chord_piece_meta(&s, piece_id)?;
if has_chord {
return Ok(JsonResp(json!({ "status": "completed", "reason": "已有吉他谱" })));
}
let query = match artist.as_deref() {
Some(a) if !a.is_empty() => format!("{a} {title}"),
_ => title,
};
let url = format!("{}/fetch", s.chord_url);
let resp = s
.http
.post(&url)
.query(&[("piece_id", piece_id.to_string()), ("query", query)])
.timeout(std::time::Duration::from_secs(15))
.send()
.await
.map_err(|e| AppError::sidecar(format!("post fetch: {e}")))?;
if !resp.status().is_success() {
let st = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(AppError::sidecar(format!("sidecar {st}: {body}")));
}
let body: Value = resp
.json()
.await
.map_err(|e| AppError::sidecar(format!("decode: {e}")))?;
Ok(JsonResp(body))
}
/// `GET /api/pieces/:id/chord/status` — 查询抓取状态。完成时把 png import 成 attachment。
async fn chord_status(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let (_title, _artist, has_chord) = chord_piece_meta(&s, piece_id)?;
if has_chord {
return Ok(JsonResp(json!({ "status": "completed", "imported": true })));
}
let url = format!("{}/status/{}", s.chord_url, piece_id);
let resp = s
.http
.get(&url)
.timeout(std::time::Duration::from_secs(10))
.send()
.await
.map_err(|e| AppError::sidecar(format!("get status: {e}")))?;
if !resp.status().is_success() {
return Err(AppError::sidecar(format!("sidecar status: {}", resp.status())));
}
let body: Value = resp
.json()
.await
.map_err(|e| AppError::sidecar(format!("decode: {e}")))?;
let st = body.get("status").and_then(|v| v.as_str()).unwrap_or("none");
let file_exists = body
.get("file_exists")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if st == "completed" && file_exists {
let attachment_id = import_chord_png(&s, piece_id).await?;
// 通知 sidecar 清掉 state + 文件,避免重复 import
let _ = s
.http
.delete(format!("{}/state/{}", s.chord_url, piece_id))
.timeout(std::time::Duration::from_secs(5))
.send()
.await;
return Ok(JsonResp(json!({
"status": "completed",
"imported": true,
"attachment_id": attachment_id,
})));
}
Ok(JsonResp(body))
}
fn chord_piece_meta(
s: &AppState,
piece_id: i64,
) -> Result<(String, Option<String>, bool), AppError> {
let conn = s.db.lock().unwrap();
let row: Option<(String, Option<String>)> = conn
.query_row(
"SELECT title, artist FROM pieces WHERE id = ?1",
params![piece_id],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.optional()?;
let (title, artist) = row.ok_or(AppError::NotFound)?;
let has_chord: bool = conn
.query_row(
"SELECT 1 FROM attachments
WHERE piece_id = ?1 AND kind = 'image' AND role = 'chord' LIMIT 1",
params![piece_id],
|_| Ok(true),
)
.optional()?
.unwrap_or(false);
Ok((title, artist, has_chord))
}
async fn import_chord_png(s: &AppState, piece_id: i64) -> Result<i64, AppError> {
let src = std::path::PathBuf::from(format!("/data/chord-fetch/{piece_id}.png"));
let bytes = tokio::fs::metadata(&src).await.map_err(AppError::Io)?;
let size = bytes.len() as i64;
let attachment_id = {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO attachments
(piece_id, kind, role, mime, filename, size_bytes, sort_order)
VALUES (?1, 'image', 'chord', 'image/png', 'chord.png', ?2,
COALESCE((SELECT MAX(sort_order) FROM attachments WHERE piece_id = ?1), 0) + 1)",
params![piece_id, size],
)?;
conn.last_insert_rowid()
};
let dst = s.blobs_dir.join(attachment_id.to_string());
if let Err(e) = tokio::fs::copy(&src, &dst).await {
// 失败回滚 db 行
let conn = s.db.lock().unwrap();
let _ = conn.execute("DELETE FROM attachments WHERE id = ?1", params![attachment_id]);
return Err(AppError::Io(e));
}
Ok(attachment_id)
}
// ---------- handlers: attachments ----------
/// `POST /api/pieces/:id/attachments?role=chord|numbered|staff` — multipart 流式上传。
/// 每个 file field(任意 name= 一个附件,`role` query 给整批文件。
async fn upload_attachments(
State(s): State<AppState>,
Path(piece_id): Path<i64>,
Query(q): Query<UploadQuery>,
mut form: Multipart,
) -> Result<JsonResp<Value>, AppError> {
let role = match q.role.as_deref().map(str::trim).filter(|s| !s.is_empty()) {
None => None,
Some(r) if matches!(r, "chord" | "numbered" | "staff") => Some(r.to_string()),
Some(other) => {
return Err(AppError::bad_request(format!(
"unsupported role '{other}', expect one of: chord / numbered / staff"
)));
}
};
{
let conn = s.db.lock().unwrap();
let exists: bool = conn
.query_row(
"SELECT 1 FROM pieces WHERE id = ?1",
params![piece_id],
|_| Ok(true),
)
.optional()?
.unwrap_or(false);
if !exists {
return Err(AppError::NotFound);
}
}
let mut created: Vec<Value> = Vec::new();
while let Some(mut field) = form
.next_field()
.await
.map_err(|e| AppError::bad_request(format!("multipart: {e}")))?
{
let filename = field
.file_name()
.map(|s| s.to_string())
.unwrap_or_else(|| "untitled".to_string());
let mime = field
.content_type()
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let kind = classify(&mime).ok_or_else(|| {
AppError::bad_request(format!("unsupported mime '{mime}' for '{filename}'"))
})?;
// 占坑拿 attachment id —— 文件名用 id,能唯一确定路径。
let attachment_id = {
let conn = s.db.lock().unwrap();
conn.execute(
"INSERT INTO attachments (piece_id, kind, role, mime, filename, size_bytes, sort_order)
VALUES (?1, ?2, ?3, ?4, ?5, 0,
COALESCE((SELECT MAX(sort_order) FROM attachments WHERE piece_id = ?1), 0) + 1)",
params![piece_id, kind, role, mime, filename],
)?;
conn.last_insert_rowid()
};
let final_path = s.blobs_dir.join(attachment_id.to_string());
let tmp_path = s.blobs_dir.join(format!("{attachment_id}.tmp"));
let written: usize = match stream_to_file(&mut field, &tmp_path).await {
Ok(n) => n,
Err(e) => {
let _ = tokio::fs::remove_file(&tmp_path).await;
let conn = s.db.lock().unwrap();
let _ = conn.execute(
"DELETE FROM attachments WHERE id = ?1",
params![attachment_id],
);
return Err(e);
}
};
if let Err(e) = tokio::fs::rename(&tmp_path, &final_path).await {
let _ = tokio::fs::remove_file(&tmp_path).await;
let conn = s.db.lock().unwrap();
let _ = conn.execute(
"DELETE FROM attachments WHERE id = ?1",
params![attachment_id],
);
return Err(AppError::Io(e));
}
{
let conn = s.db.lock().unwrap();
conn.execute(
"UPDATE attachments SET size_bytes = ?1 WHERE id = ?2",
params![written as i64, attachment_id],
)?;
}
created.push(json!({
"id": attachment_id,
"kind": kind,
"role": role,
"mime": mime,
"filename": filename,
"size_bytes": written,
}));
}
if created.is_empty() {
return Err(AppError::bad_request("no files uploaded"));
}
Ok(JsonResp(json!({ "attachments": created })))
}
async fn stream_to_file(
field: &mut axum::extract::multipart::Field<'_>,
path: &std::path::Path,
) -> Result<usize, AppError> {
let mut file = tokio::fs::File::create(path).await.map_err(AppError::Io)?;
let mut total: usize = 0;
while let Some(chunk) = field
.chunk()
.await
.map_err(|e| AppError::bad_request(format!("upload read: {e}")))?
{
total += chunk.len();
if total > SINGLE_FILE_BYTES {
return Err(AppError::bad_request(format!(
"single file exceeds {SINGLE_FILE_BYTES} bytes"
)));
}
file.write_all(&chunk).await.map_err(AppError::Io)?;
}
file.flush().await.map_err(AppError::Io)?;
file.sync_all().await.map_err(AppError::Io)?;
Ok(total)
}
/// `GET /api/attachments/:id` — Range-aware 下载。
async fn get_attachment(
State(s): State<AppState>,
Path(id): Path<i64>,
req: Request<Body>,
) -> Result<Response, AppError> {
let row: Option<(String, String, String)> = {
let conn = s.db.lock().unwrap();
conn.query_row(
"SELECT mime, filename, kind FROM attachments WHERE id = ?1",
params![id],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.optional()?
};
let (mime, filename, _kind) = row.ok_or(AppError::NotFound)?;
let path = s.blobs_dir.join(id.to_string());
let mime_hv: header::HeaderValue = mime
.parse()
.unwrap_or_else(|_| header::HeaderValue::from_static("application/octet-stream"));
let svc = tower_http::services::ServeFile::new(&path);
let mut resp = svc
.oneshot(req)
.await
.map_err(|e| AppError::Io(std::io::Error::other(e.to_string())))?
.into_response();
// 强一些的缓存头,video 拖动友好
resp.headers_mut()
.insert(header::CACHE_CONTROL, header::HeaderValue::from_static("private, max-age=3600"));
resp.headers_mut().insert(header::CONTENT_TYPE, mime_hv);
if let Ok(disp) = format!(
"inline; filename*=UTF-8''{}",
percent_encode(&filename)
)
.parse()
{
resp.headers_mut().insert(header::CONTENT_DISPOSITION, disp);
}
Ok(resp)
}
async fn delete_attachment(
State(s): State<AppState>,
Path(id): Path<i64>,
) -> Result<JsonResp<Value>, AppError> {
let n = {
let conn = s.db.lock().unwrap();
conn.execute("DELETE FROM attachments WHERE id = ?1", params![id])?
};
if n == 0 {
return Err(AppError::NotFound);
}
let _ = tokio::fs::remove_file(s.blobs_dir.join(id.to_string())).await;
Ok(JsonResp(json!({ "ok": true })))
}
// ---------- helpers ----------
fn classify(mime: &str) -> Option<&'static str> {
let m = mime.split(';').next().unwrap_or("").trim().to_ascii_lowercase();
if m.starts_with("video/") {
Some("video")
} else if m.starts_with("audio/") {
Some("audio")
} else if m == "application/pdf" {
Some("pdf")
} else if m.starts_with("image/") {
Some("image")
} else {
None
}
}
fn percent_encode(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.as_bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(*b as char)
}
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}
// ---------- error type ----------
enum AppError {
BadRequest(String),
NotFound,
Db(rusqlite::Error),
Io(std::io::Error),
Sidecar(String),
}
impl AppError {
fn bad_request(msg: impl Into<String>) -> Self {
Self::BadRequest(msg.into())
}
fn sidecar(msg: impl Into<String>) -> Self {
Self::Sidecar(msg.into())
}
}
impl From<rusqlite::Error> for AppError {
fn from(e: rusqlite::Error) -> Self {
Self::Db(e)
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
match self {
Self::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg).into_response(),
Self::NotFound => (StatusCode::NOT_FOUND, "not found").into_response(),
Self::Db(e) => {
tracing::error!(error = %e, "sqlite error");
(StatusCode::INTERNAL_SERVER_ERROR, "db error").into_response()
}
Self::Io(e) => {
tracing::error!(error = %e, "io error");
(StatusCode::INTERNAL_SERVER_ERROR, "io error").into_response()
}
Self::Sidecar(msg) => {
tracing::warn!(error = %msg, "chord sidecar");
(StatusCode::BAD_GATEWAY, msg).into_response()
}
}
}
}