diff --git a/Cargo.lock b/Cargo.lock index 1510034343..be8d242b6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -603,6 +603,7 @@ dependencies = [ "reqwest", "serde", "serde_derive", + "time 0.3.37", "tokio", "tokio-tungstenite 0.26.1", "toml", @@ -610,6 +611,7 @@ dependencies = [ "tracing-subscriber", "twitch_api", "twitch_oauth2", + "twitch_types", "url", ] @@ -2958,9 +2960,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.36" +version = "0.3.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" dependencies = [ "deranged", "itoa", @@ -2968,7 +2970,7 @@ dependencies = [ "powerfmt", "serde", "time-core", - "time-macros 0.2.18", + "time-macros 0.2.19", ] [[package]] @@ -2989,9 +2991,9 @@ dependencies = [ [[package]] name = "time-macros" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" dependencies = [ "num-conv", "time-core", @@ -3494,7 +3496,7 @@ version = "0.4.8" dependencies = [ "serde", "serde_derive", - "time 0.3.36", + "time 0.3.37", ] [[package]] diff --git a/examples/chatbot/Cargo.toml b/examples/chatbot/Cargo.toml index 1c16341964..ff6b2491c1 100644 --- a/examples/chatbot/Cargo.toml +++ b/examples/chatbot/Cargo.toml @@ -5,8 +5,9 @@ edition = "2021" [dependencies] twitch_api = { workspace = true, features = ["eventsub", "helix", "reqwest"] } -clap = { version = "4.5.26", features = ["derive", "env"] } twitch_oauth2 = { workspace = true, features = ["client"] } +twitch_types = { workspace = true, features = ["time"] } +clap = { version = "4.5.26", features = ["derive", "env"] } dotenvy = "0.15.7" color-eyre = "0.6.3" tracing = "0.1.41" @@ -20,3 +21,4 @@ reqwest = "0.12.12" eyre = "0.6.12" url.workspace = true futures = "0.3.31" +time = { version = "0.3.37", features = ["formatting"] } diff --git a/examples/chatbot/src/main.rs b/examples/chatbot/src/main.rs index b00685c507..9ae7bd46b4 100644 --- a/examples/chatbot/src/main.rs +++ b/examples/chatbot/src/main.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use clap::Parser; use eyre::WrapErr as _; use tokio::sync::Mutex; + use twitch_api::{ client::ClientDefault, eventsub::{self, Event, Message, Payload}, @@ -12,21 +13,36 @@ use twitch_api::{ }; use twitch_oauth2::{Scope, TwitchToken as _, UserToken}; +/// The scopes we need for the bot. +const SCOPES: &[Scope] = &[Scope::UserReadChat, Scope::UserWriteChat]; + +/// How often we should check if the token is still valid. +const TOKEN_VALIDATION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); +/// The threshold at which we should refresh the token before expiration. +/// +/// Only checked every [TOKEN_VALIDATION_INTERVAL] seconds +const TOKEN_EXPIRATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60); + #[derive(Parser, Debug, Clone)] #[clap(about, version)] pub struct Cli { /// Client ID of twitch application #[clap(long, env, hide_env = true)] pub client_id: twitch_oauth2::ClientId, - #[clap(long, env, hide_env = true)] - pub broadcaster_login: twitch_api::types::UserName, + /// Chat to connect to, can take multiple values separated by commas + #[clap(long, env, value_delimiter = ',', hide_env = true)] + pub broadcaster_login: Vec, /// Path to config file #[clap(long, default_value = concat!(env!("CARGO_MANIFEST_DIR"), "/config.toml"))] pub config: std::path::PathBuf, + /// Path to token file + #[clap(long, default_value = concat!(env!("CARGO_MANIFEST_DIR"), "/auth.toml"))] + pub auth: std::path::PathBuf, } #[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug)] pub struct Config { + #[serde(default)] command: Vec, } @@ -57,37 +73,44 @@ async fn main() -> Result<(), eyre::Report> { ClientDefault::default_client_with_name(Some("my_chatbot".parse()?))?, ); - // First we need to get a token, preferably you'd also store this information somewhere safe to reuse when restarting the application. - // For now we'll just get a new token every time the application starts. - // One way to store the token is to store the access_token and refresh_token in a file and load it when the application starts with - // `twitch_oauth2::UserToken::from_existing` - let mut builder = twitch_oauth2::tokens::DeviceUserTokenBuilder::new( - opts.client_id.clone(), - vec![Scope::UserReadChat, Scope::UserWriteChat], - ); - let code = builder.start(&client).await?; - println!("Please go to: {}", code.verification_uri); - let token = builder.wait_for_code(&client, tokio::time::sleep).await?; - - let Some(twitch_api::helix::users::User { - id: broadcaster, .. - }) = client - .get_user_from_login(&opts.broadcaster_login, &token) - .await? - else { - eyre::bail!( - "No broadcaster found with login: {}", - opts.broadcaster_login + // Get an user access token. + // For this example we store the token in a file, but you should probably store it in a database or similar. + // If there is no token saved, we use Device Code Flow to get a token. + // This flow works best with public client type applications. + // If you have a confidential client type application you should use `UserTokenBuilder` for OAuth authorization code flow. + let token = if let Some(token) = load_token(&opts.auth, &client).await? { + token + } else { + let mut builder = twitch_oauth2::tokens::DeviceUserTokenBuilder::new( + opts.client_id.clone(), + SCOPES.to_vec(), ); + let code = builder.start(&client).await?; + println!("Please go to: {}", code.verification_uri); + builder.wait_for_code(&client, tokio::time::sleep).await? }; + save_token(&token, &opts.auth)?; let token = Arc::new(Mutex::new(token)); + // Get the broadcaster ids from the logins. + let mut broadcasters = vec![]; + for login in opts.broadcaster_login.iter() { + if let Some(twitch_api::helix::users::User { id, .. }) = + client.get_user_from_login(login, &token).await? + { + broadcasters.push(id); + } else { + eyre::bail!("No broadcaster found with login: {}", login); + } + } + + // Create the bot and start it. let bot = Bot { opts, client, token, config, - broadcaster, + broadcasters, }; bot.start().await?; Ok(()) @@ -98,10 +121,12 @@ pub struct Bot { pub client: HelixClient<'static, reqwest::Client>, pub token: Arc>, pub config: Config, - pub broadcaster: twitch_api::types::UserId, + pub broadcasters: Vec, } impl Bot { + /// Start the bot. This will connect to the chat and start handling for events with [Bot::handle_event]. + /// This will also start a task that will refresh the token if it's about to expire and check if it's still valid. pub async fn start(&self) -> Result<(), eyre::Report> { // To make a connection to the chat we need to use a websocket connection. // This is a wrapper for the websocket connection that handles the reconnects and handles all messages from eventsub. @@ -110,27 +135,18 @@ impl Bot { token: self.token.clone(), client: self.client.clone(), connect_url: twitch_api::TWITCH_EVENTSUB_WEBSOCKET_URL.clone(), - chats: vec![self.broadcaster.clone()], + chats: self.broadcasters.clone(), }; let refresh_token = async move { let token = self.token.clone(); let client = self.client.clone(); // We check constantly if the token is valid. // We also need to refresh the token if it's about to be expired. - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); + let mut interval = tokio::time::interval(TOKEN_VALIDATION_INTERVAL); loop { interval.tick().await; let mut token = token.lock().await; - if token.expires_in() < std::time::Duration::from_secs(60) { - token - .refresh_token(&self.client) - .await - .wrap_err("couldn't refresh token")?; - } - token - .validate_token(&client) - .await - .wrap_err("couldn't validate token")?; + refresh_and_validate_token(&mut token, &client, &self.opts).await?; } #[allow(unreachable_code)] Ok(()) @@ -140,12 +156,36 @@ impl Bot { Ok(()) } + /// Handle chat messages, if they start with `!` send it to [Bot::command]. + async fn handle_chat_message( + &self, + token: tokio::sync::MutexGuard<'_, UserToken>, + payload: eventsub::channel::ChannelChatMessageV1Payload, + subscription: eventsub::EventSubscriptionInformation< + eventsub::channel::ChannelChatMessageV1, + >, + ) -> Result<(), eyre::Error> { + if let Some(command) = payload.message.text.strip_prefix("!") { + let mut split_whitespace = command.split_whitespace(); + let command = split_whitespace.next().unwrap(); + let rest = split_whitespace.next(); + + self.command(&payload, &subscription, command, rest, &token) + .await?; + } + Ok(()) + } + + /// Handle all eventsub events. + /// We print the message to the console and if it's a chat message we send it to [Bot::handle_chat_message]. + /// If there's an event you want to listen to you should first add it to [websocket::ChatWebsocketClient::process_welcome_message] and then handle it here. async fn handle_event( &self, event: Event, timestamp: twitch_api::types::Timestamp, ) -> Result<(), eyre::Report> { let token = self.token.lock().await; + let time_format = time::format_description::parse("[hour]:[minute]:[second]")?; match event { Event::ChannelChatMessageV1(Payload { message: Message::Notification(payload), @@ -153,25 +193,23 @@ impl Bot { .. }) => { println!( - "[{}] {}: {}", - timestamp, payload.chatter_user_name, payload.message.text + "[{}] #{} {}: {}", + timestamp.to_utc().format(&time_format).unwrap(), + payload.broadcaster_user_login, + payload.chatter_user_name, + payload.message.text ); - if let Some(command) = payload.message.text.strip_prefix("!") { - let mut split_whitespace = command.split_whitespace(); - let command = split_whitespace.next().unwrap(); - let rest = split_whitespace.next(); - - self.command(&payload, &subscription, command, rest, &token) - .await?; - } + + self.handle_chat_message(token, payload, subscription) + .await?; } Event::ChannelChatNotificationV1(Payload { message: Message::Notification(payload), .. }) => { println!( - "[{}] {}: {}", - timestamp, + "[{}] [Event] {}: {}", + timestamp.to_utc().format(&time_format).unwrap(), match &payload.chatter { eventsub::channel::chat::notification::Chatter::Chatter { chatter_user_name: user, @@ -206,7 +244,7 @@ impl Bot { &payload.message_id, response .response - .replace("{user}", &payload.chatter_user_name.as_str()) + .replace("{user}", payload.chatter_user_name.as_str()) .as_str(), token, ) @@ -215,3 +253,65 @@ impl Bot { Ok(()) } } + +async fn refresh_and_validate_token( + token: &mut UserToken, + client: &HelixClient<'_, reqwest::Client>, + opts: &Cli, +) -> Result<(), eyre::Report> { + if token.expires_in() < TOKEN_EXPIRATION_THRESHOLD { + tracing::info!("refreshed token"); + token + .refresh_token(client) + .await + .wrap_err("couldn't refresh token")?; + save_token(token, &opts.auth)?; + } + token + .validate_token(client) + .await + .wrap_err("couldn't validate token")?; + Ok(()) +} + +/// Used to save the token to a file +#[derive(serde_derive::Serialize, serde_derive::Deserialize)] +struct SavedToken { + access_token: twitch_oauth2::AccessToken, + refresh_token: twitch_oauth2::RefreshToken, +} + +// you should probably replace this with something more robust +#[cfg(debug_assertions)] +fn save_token( + token: &twitch_oauth2::UserToken, + save_path: &std::path::Path, +) -> Result<(), eyre::Report> { + let token = SavedToken { + access_token: token.access_token.clone(), + refresh_token: token.refresh_token.clone().unwrap(), + }; + let text = toml::to_string(&token)?; + std::fs::write(save_path, text)?; + Ok(()) +} + +#[cfg(debug_assertions)] +async fn load_token( + path: &std::path::Path, + client: &HelixClient<'_, reqwest::Client>, +) -> Result, eyre::Report> { + let Some(text) = std::fs::read_to_string(path).ok() else { + return Ok(None); + }; + let token: SavedToken = toml::from_str(&text)?; + Ok(Some( + twitch_oauth2::UserToken::from_existing( + client, + token.access_token, + token.refresh_token, + None, + ) + .await?, + )) +}