Skip to content

some improvements #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion examples/chatbot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ edition = "2021"

[dependencies]
twitch_api = { workspace = true, features = ["eventsub", "helix", "reqwest"] }
clap = { version = "4.5.26", features = ["derive", "env"] }
twitch_oauth2 = { workspace = true, features = ["client"] }
twitch_types = { workspace = true, features = ["time"] }
clap = { version = "4.5.26", features = ["derive", "env"] }
dotenvy = "0.15.7"
color-eyre = "0.6.3"
tracing = "0.1.41"
Expand All @@ -20,3 +21,4 @@ reqwest = "0.12.12"
eyre = "0.6.12"
url.workspace = true
futures = "0.3.31"
time = { version = "0.3.37", features = ["formatting"] }
200 changes: 150 additions & 50 deletions examples/chatbot/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,44 @@ use std::sync::Arc;
use clap::Parser;
use eyre::WrapErr as _;
use tokio::sync::Mutex;

use twitch_api::{
client::ClientDefault,
eventsub::{self, Event, Message, Payload},
HelixClient,
};
use twitch_oauth2::{Scope, TwitchToken as _, UserToken};

/// The scopes we need for the bot.
const SCOPES: &[Scope] = &[Scope::UserReadChat, Scope::UserWriteChat];

/// How often we should check if the token is still valid.
const TOKEN_VALIDATION_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
/// The threshold at which we should refresh the token before expiration.
///
/// Only checked every [TOKEN_VALIDATION_INTERVAL] seconds
const TOKEN_EXPIRATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(60);

#[derive(Parser, Debug, Clone)]
#[clap(about, version)]
pub struct Cli {
/// Client ID of twitch application
#[clap(long, env, hide_env = true)]
pub client_id: twitch_oauth2::ClientId,
#[clap(long, env, hide_env = true)]
pub broadcaster_login: twitch_api::types::UserName,
/// Chat to connect to, can take multiple values separated by commas
#[clap(long, env, value_delimiter = ',', hide_env = true)]
pub broadcaster_login: Vec<twitch_api::types::UserName>,
/// Path to config file
#[clap(long, default_value = concat!(env!("CARGO_MANIFEST_DIR"), "/config.toml"))]
pub config: std::path::PathBuf,
/// Path to token file
#[clap(long, default_value = concat!(env!("CARGO_MANIFEST_DIR"), "/auth.toml"))]
pub auth: std::path::PathBuf,
}

#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug)]
pub struct Config {
#[serde(default)]
command: Vec<Command>,
}

Expand Down Expand Up @@ -57,37 +73,44 @@ async fn main() -> Result<(), eyre::Report> {
ClientDefault::default_client_with_name(Some("my_chatbot".parse()?))?,
);

// First we need to get a token, preferably you'd also store this information somewhere safe to reuse when restarting the application.
// For now we'll just get a new token every time the application starts.
// One way to store the token is to store the access_token and refresh_token in a file and load it when the application starts with
// `twitch_oauth2::UserToken::from_existing`
let mut builder = twitch_oauth2::tokens::DeviceUserTokenBuilder::new(
opts.client_id.clone(),
vec![Scope::UserReadChat, Scope::UserWriteChat],
);
let code = builder.start(&client).await?;
println!("Please go to: {}", code.verification_uri);
let token = builder.wait_for_code(&client, tokio::time::sleep).await?;

let Some(twitch_api::helix::users::User {
id: broadcaster, ..
}) = client
.get_user_from_login(&opts.broadcaster_login, &token)
.await?
else {
eyre::bail!(
"No broadcaster found with login: {}",
opts.broadcaster_login
// Get an user access token.
// For this example we store the token in a file, but you should probably store it in a database or similar.
// If there is no token saved, we use Device Code Flow to get a token.
// This flow works best with public client type applications.
// If you have a confidential client type application you should use `UserTokenBuilder` for OAuth authorization code flow.
let token = if let Some(token) = load_token(&opts.auth, &client).await? {
token
} else {
let mut builder = twitch_oauth2::tokens::DeviceUserTokenBuilder::new(
opts.client_id.clone(),
SCOPES.to_vec(),
);
let code = builder.start(&client).await?;
println!("Please go to: {}", code.verification_uri);
builder.wait_for_code(&client, tokio::time::sleep).await?
};
save_token(&token, &opts.auth)?;
let token = Arc::new(Mutex::new(token));

// Get the broadcaster ids from the logins.
let mut broadcasters = vec![];
for login in opts.broadcaster_login.iter() {
if let Some(twitch_api::helix::users::User { id, .. }) =
client.get_user_from_login(login, &token).await?
{
broadcasters.push(id);
} else {
eyre::bail!("No broadcaster found with login: {}", login);
}
}

// Create the bot and start it.
let bot = Bot {
opts,
client,
token,
config,
broadcaster,
broadcasters,
};
bot.start().await?;
Ok(())
Expand All @@ -98,10 +121,12 @@ pub struct Bot {
pub client: HelixClient<'static, reqwest::Client>,
pub token: Arc<Mutex<twitch_oauth2::UserToken>>,
pub config: Config,
pub broadcaster: twitch_api::types::UserId,
pub broadcasters: Vec<twitch_api::types::UserId>,
}

impl Bot {
/// Start the bot. This will connect to the chat and start handling for events with [Bot::handle_event].
/// This will also start a task that will refresh the token if it's about to expire and check if it's still valid.
pub async fn start(&self) -> Result<(), eyre::Report> {
// To make a connection to the chat we need to use a websocket connection.
// This is a wrapper for the websocket connection that handles the reconnects and handles all messages from eventsub.
Expand All @@ -110,27 +135,18 @@ impl Bot {
token: self.token.clone(),
client: self.client.clone(),
connect_url: twitch_api::TWITCH_EVENTSUB_WEBSOCKET_URL.clone(),
chats: vec![self.broadcaster.clone()],
chats: self.broadcasters.clone(),
};
let refresh_token = async move {
let token = self.token.clone();
let client = self.client.clone();
// We check constantly if the token is valid.
// We also need to refresh the token if it's about to be expired.
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
let mut interval = tokio::time::interval(TOKEN_VALIDATION_INTERVAL);
loop {
interval.tick().await;
let mut token = token.lock().await;
if token.expires_in() < std::time::Duration::from_secs(60) {
token
.refresh_token(&self.client)
.await
.wrap_err("couldn't refresh token")?;
}
token
.validate_token(&client)
.await
.wrap_err("couldn't validate token")?;
refresh_and_validate_token(&mut token, &client, &self.opts).await?;
}
#[allow(unreachable_code)]
Ok(())
Expand All @@ -140,38 +156,60 @@ impl Bot {
Ok(())
}

/// Handle chat messages, if they start with `!` send it to [Bot::command].
async fn handle_chat_message(
&self,
token: tokio::sync::MutexGuard<'_, UserToken>,
payload: eventsub::channel::ChannelChatMessageV1Payload,
subscription: eventsub::EventSubscriptionInformation<
eventsub::channel::ChannelChatMessageV1,
>,
) -> Result<(), eyre::Error> {
if let Some(command) = payload.message.text.strip_prefix("!") {
let mut split_whitespace = command.split_whitespace();
let command = split_whitespace.next().unwrap();
let rest = split_whitespace.next();

self.command(&payload, &subscription, command, rest, &token)
.await?;
}
Ok(())
}

/// Handle all eventsub events.
/// We print the message to the console and if it's a chat message we send it to [Bot::handle_chat_message].
/// If there's an event you want to listen to you should first add it to [websocket::ChatWebsocketClient::process_welcome_message] and then handle it here.
async fn handle_event(
&self,
event: Event,
timestamp: twitch_api::types::Timestamp,
) -> Result<(), eyre::Report> {
let token = self.token.lock().await;
let time_format = time::format_description::parse("[hour]:[minute]:[second]")?;
match event {
Event::ChannelChatMessageV1(Payload {
message: Message::Notification(payload),
subscription,
..
}) => {
println!(
"[{}] {}: {}",
timestamp, payload.chatter_user_name, payload.message.text
"[{}] #{} {}: {}",
timestamp.to_utc().format(&time_format).unwrap(),
payload.broadcaster_user_login,
payload.chatter_user_name,
payload.message.text
);
if let Some(command) = payload.message.text.strip_prefix("!") {
let mut split_whitespace = command.split_whitespace();
let command = split_whitespace.next().unwrap();
let rest = split_whitespace.next();

self.command(&payload, &subscription, command, rest, &token)
.await?;
}

self.handle_chat_message(token, payload, subscription)
.await?;
}
Event::ChannelChatNotificationV1(Payload {
message: Message::Notification(payload),
..
}) => {
println!(
"[{}] {}: {}",
timestamp,
"[{}] [Event] {}: {}",
timestamp.to_utc().format(&time_format).unwrap(),
match &payload.chatter {
eventsub::channel::chat::notification::Chatter::Chatter {
chatter_user_name: user,
Expand Down Expand Up @@ -206,7 +244,7 @@ impl Bot {
&payload.message_id,
response
.response
.replace("{user}", &payload.chatter_user_name.as_str())
.replace("{user}", payload.chatter_user_name.as_str())
.as_str(),
token,
)
Expand All @@ -215,3 +253,65 @@ impl Bot {
Ok(())
}
}

async fn refresh_and_validate_token(
token: &mut UserToken,
client: &HelixClient<'_, reqwest::Client>,
opts: &Cli,
) -> Result<(), eyre::Report> {
if token.expires_in() < TOKEN_EXPIRATION_THRESHOLD {
tracing::info!("refreshed token");
token
.refresh_token(client)
.await
.wrap_err("couldn't refresh token")?;
save_token(token, &opts.auth)?;
}
token
.validate_token(client)
.await
.wrap_err("couldn't validate token")?;
Ok(())
}

/// Used to save the token to a file
#[derive(serde_derive::Serialize, serde_derive::Deserialize)]
struct SavedToken {
access_token: twitch_oauth2::AccessToken,
refresh_token: twitch_oauth2::RefreshToken,
}

// you should probably replace this with something more robust
#[cfg(debug_assertions)]
fn save_token(
token: &twitch_oauth2::UserToken,
save_path: &std::path::Path,
) -> Result<(), eyre::Report> {
let token = SavedToken {
access_token: token.access_token.clone(),
refresh_token: token.refresh_token.clone().unwrap(),
};
let text = toml::to_string(&token)?;
std::fs::write(save_path, text)?;
Ok(())
}

#[cfg(debug_assertions)]
async fn load_token(
path: &std::path::Path,
client: &HelixClient<'_, reqwest::Client>,
) -> Result<Option<twitch_oauth2::UserToken>, eyre::Report> {
let Some(text) = std::fs::read_to_string(path).ok() else {
return Ok(None);
};
let token: SavedToken = toml::from_str(&text)?;
Ok(Some(
twitch_oauth2::UserToken::from_existing(
client,
token.access_token,
token.refresh_token,
None,
)
.await?,
))
}
Loading