Add tokio watch channel to update mpris player state

This commit is contained in:
2025-04-09 17:24:37 +02:00
parent 2c2d8dcb4c
commit df58b0f7ba
2 changed files with 26 additions and 42 deletions

View File

@@ -5,7 +5,7 @@ use axum::{
routing::get, routing::get,
Router, Router,
}; };
use tokio::sync::broadcast::Sender; use tokio::sync::watch::Sender;
use crate::database::Database; use crate::database::Database;
@@ -24,7 +24,7 @@ pub async fn http_serve(database: &Database, mpris_producer: Sender<(String, Str
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
.route("/{length}", get(add_rating)) .route("/{user_id}/{rating}", get(add_rating))
.with_state(shared_state); .with_state(shared_state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
@@ -39,24 +39,19 @@ async fn add_rating(
Path((user_id, rating)): Path<(i64, i64)>, Path((user_id, rating)): Path<(i64, i64)>,
State(shared): State<SharedState>, State(shared): State<SharedState>,
) -> Response { ) -> Response {
// Get the current interpret and track from the broadcast channel let mut mpris_rx = shared.mpris_sender.subscribe();
match shared.mpris_sender.subscribe().recv().await {
Ok((interpret, track)) => { // Get the current interpret and track from the watch channel
// write to db let (interpret, track) = (*mpris_rx.borrow_and_update()).clone();
match shared
.database match shared
.user_add_rating(user_id, &interpret, &track, rating) .database
.await .user_add_rating(user_id, &interpret, &track, rating)
{ .await
Ok(_) => (StatusCode::OK, "Done.").into_response(), {
Err(e) => { Ok(_) => (StatusCode::OK, "Done.").into_response(),
println!("HTTP error: {e}");
(StatusCode::BAD_REQUEST, e.to_string()).into_response()
}
}
}
Err(e) => { Err(e) => {
println!("mpris error: {e}"); println!("HTTP error: {e}");
(StatusCode::BAD_REQUEST, e.to_string()).into_response() (StatusCode::BAD_REQUEST, e.to_string()).into_response()
} }
} }

View File

@@ -3,15 +3,12 @@ mod http_server;
mod player; mod player;
mod userinterface; mod userinterface;
use std::{env::args, sync::Arc}; use std::{env::args, sync::Arc, thread::sleep, time::Duration};
use tokio::sync::broadcast; use tokio::sync::watch;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Initialize the mpris player
let player = player::MprisPlayer::new().expect("Could not create player");
// Initialize the database // Initialize the database
let db = Arc::new( let db = Arc::new(
database::Database::new() database::Database::new()
@@ -62,7 +59,8 @@ async fn main() {
} }
// Instantiate the mpris channel // Instantiate the mpris channel
let (mpris_tx, _) = broadcast::channel(32); // Channel size 1 makes sure we always have the correct song in the queue
let (mpris_tx, mut mpris_rx) = watch::channel((String::new(), String::new()));
let mpris_tx_http = mpris_tx.clone(); let mpris_tx_http = mpris_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -73,6 +71,8 @@ async fn main() {
println!("Error sending interpret and track: {e}"); println!("Error sending interpret and track: {e}");
} }
} }
// Use the std sleep here to avoid an await which will requires player to be Send.
sleep(Duration::from_millis(10));
} }
}); });
@@ -86,24 +86,13 @@ async fn main() {
let (usernumber, userrating) = userinterface::get_user_rating(&db) let (usernumber, userrating) = userinterface::get_user_rating(&db)
.await .await
.expect("Can not get user input"); .expect("Can not get user input");
let (interpret, track) = (*mpris_rx.borrow_and_update()).clone();
match player.get_interpret_and_track() { if let Err(e) = db
Ok((interpret, song)) => { .user_add_rating(usernumber, &interpret, &track, userrating)
if let Err(e) = db .await
.user_add_rating(usernumber, &interpret, &song, userrating) {
.await eprintln!("{e}");
{
eprintln!("{e}");
}
}
Err(e) => {
eprintln!("{e}");
continue;
}
} }
} }
} }
fn mpris_player_producer() {
//
}