@@ -5,28 +5,44 @@ use std::sync::Arc;
5
5
use clap:: Parser ;
6
6
use eyre:: WrapErr as _;
7
7
use tokio:: sync:: Mutex ;
8
+
8
9
use twitch_api:: {
9
10
client:: ClientDefault ,
10
11
eventsub:: { self , Event , Message , Payload } ,
11
12
HelixClient ,
12
13
} ;
13
14
use twitch_oauth2:: { Scope , TwitchToken as _, UserToken } ;
14
15
16
+ /// The scopes we need for the bot.
17
+ const SCOPES : & [ Scope ] = & [ Scope :: UserReadChat , Scope :: UserWriteChat ] ;
18
+
19
+ /// How often we should check if the token is still valid.
20
+ const TOKEN_VALIDATION_INTERVAL : std:: time:: Duration = std:: time:: Duration :: from_secs ( 30 ) ;
21
+ /// The threshold at which we should refresh the token before expiration.
22
+ ///
23
+ /// Only checked every [TOKEN_VALIDATION_INTERVAL] seconds
24
+ const TOKEN_EXPIRATION_THRESHOLD : std:: time:: Duration = std:: time:: Duration :: from_secs ( 60 ) ;
25
+
15
26
#[ derive( Parser , Debug , Clone ) ]
16
27
#[ clap( about, version) ]
17
28
pub struct Cli {
18
29
/// Client ID of twitch application
19
30
#[ clap( long, env, hide_env = true ) ]
20
31
pub client_id : twitch_oauth2:: ClientId ,
21
- #[ clap( long, env, hide_env = true ) ]
22
- pub broadcaster_login : twitch_api:: types:: UserName ,
32
+ /// Chat to connect to, can take multiple values separated by commas
33
+ #[ clap( long, env, value_delimiter = ',' , hide_env = true ) ]
34
+ pub broadcaster_login : Vec < twitch_api:: types:: UserName > ,
23
35
/// Path to config file
24
36
#[ clap( long, default_value = concat!( env!( "CARGO_MANIFEST_DIR" ) , "/config.toml" ) ) ]
25
37
pub config : std:: path:: PathBuf ,
38
+ /// Path to token file
39
+ #[ clap( long, default_value = concat!( env!( "CARGO_MANIFEST_DIR" ) , "/auth.toml" ) ) ]
40
+ pub auth : std:: path:: PathBuf ,
26
41
}
27
42
28
43
#[ derive( serde_derive:: Serialize , serde_derive:: Deserialize , Debug ) ]
29
44
pub struct Config {
45
+ #[ serde( default ) ]
30
46
command : Vec < Command > ,
31
47
}
32
48
@@ -57,37 +73,44 @@ async fn main() -> Result<(), eyre::Report> {
57
73
ClientDefault :: default_client_with_name ( Some ( "my_chatbot" . parse ( ) ?) ) ?,
58
74
) ;
59
75
60
- // First we need to get a token, preferably you'd also store this information somewhere safe to reuse when restarting the application.
61
- // For now we'll just get a new token every time the application starts.
62
- // One way to store the token is to store the access_token and refresh_token in a file and load it when the application starts with
63
- // `twitch_oauth2::UserToken::from_existing`
64
- let mut builder = twitch_oauth2:: tokens:: DeviceUserTokenBuilder :: new (
65
- opts. client_id . clone ( ) ,
66
- vec ! [ Scope :: UserReadChat , Scope :: UserWriteChat ] ,
67
- ) ;
68
- let code = builder. start ( & client) . await ?;
69
- println ! ( "Please go to: {}" , code. verification_uri) ;
70
- let token = builder. wait_for_code ( & client, tokio:: time:: sleep) . await ?;
71
-
72
- let Some ( twitch_api:: helix:: users:: User {
73
- id : broadcaster, ..
74
- } ) = client
75
- . get_user_from_login ( & opts. broadcaster_login , & token)
76
- . await ?
77
- else {
78
- eyre:: bail!(
79
- "No broadcaster found with login: {}" ,
80
- opts. broadcaster_login
76
+ // Get an user access token.
77
+ // For this example we store the token in a file, but you should probably store it in a database or similar.
78
+ // If there is no token saved, we use Device Code Flow to get a token.
79
+ // This flow works best with public client type applications.
80
+ // If you have a confidential client type application you should use `UserTokenBuilder` for OAuth authorization code flow.
81
+ let token = if let Some ( token) = load_token ( & opts. auth , & client) . await ? {
82
+ token
83
+ } else {
84
+ let mut builder = twitch_oauth2:: tokens:: DeviceUserTokenBuilder :: new (
85
+ opts. client_id . clone ( ) ,
86
+ SCOPES . to_vec ( ) ,
81
87
) ;
88
+ let code = builder. start ( & client) . await ?;
89
+ println ! ( "Please go to: {}" , code. verification_uri) ;
90
+ builder. wait_for_code ( & client, tokio:: time:: sleep) . await ?
82
91
} ;
92
+ save_token ( & token, & opts. auth ) ?;
83
93
let token = Arc :: new ( Mutex :: new ( token) ) ;
84
94
95
+ // Get the broadcaster ids from the logins.
96
+ let mut broadcasters = vec ! [ ] ;
97
+ for login in opts. broadcaster_login . iter ( ) {
98
+ if let Some ( twitch_api:: helix:: users:: User { id, .. } ) =
99
+ client. get_user_from_login ( login, & token) . await ?
100
+ {
101
+ broadcasters. push ( id) ;
102
+ } else {
103
+ eyre:: bail!( "No broadcaster found with login: {}" , login) ;
104
+ }
105
+ }
106
+
107
+ // Create the bot and start it.
85
108
let bot = Bot {
86
109
opts,
87
110
client,
88
111
token,
89
112
config,
90
- broadcaster ,
113
+ broadcasters ,
91
114
} ;
92
115
bot. start ( ) . await ?;
93
116
Ok ( ( ) )
@@ -98,10 +121,12 @@ pub struct Bot {
98
121
pub client : HelixClient < ' static , reqwest:: Client > ,
99
122
pub token : Arc < Mutex < twitch_oauth2:: UserToken > > ,
100
123
pub config : Config ,
101
- pub broadcaster : twitch_api:: types:: UserId ,
124
+ pub broadcasters : Vec < twitch_api:: types:: UserId > ,
102
125
}
103
126
104
127
impl Bot {
128
+ /// Start the bot. This will connect to the chat and start handling for events with [Bot::handle_event].
129
+ /// This will also start a task that will refresh the token if it's about to expire and check if it's still valid.
105
130
pub async fn start ( & self ) -> Result < ( ) , eyre:: Report > {
106
131
// To make a connection to the chat we need to use a websocket connection.
107
132
// This is a wrapper for the websocket connection that handles the reconnects and handles all messages from eventsub.
@@ -110,27 +135,18 @@ impl Bot {
110
135
token : self . token . clone ( ) ,
111
136
client : self . client . clone ( ) ,
112
137
connect_url : twitch_api:: TWITCH_EVENTSUB_WEBSOCKET_URL . clone ( ) ,
113
- chats : vec ! [ self . broadcaster . clone( ) ] ,
138
+ chats : self . broadcasters . clone ( ) ,
114
139
} ;
115
140
let refresh_token = async move {
116
141
let token = self . token . clone ( ) ;
117
142
let client = self . client . clone ( ) ;
118
143
// We check constantly if the token is valid.
119
144
// We also need to refresh the token if it's about to be expired.
120
- let mut interval = tokio:: time:: interval ( tokio :: time :: Duration :: from_secs ( 30 ) ) ;
145
+ let mut interval = tokio:: time:: interval ( TOKEN_VALIDATION_INTERVAL ) ;
121
146
loop {
122
147
interval. tick ( ) . await ;
123
148
let mut token = token. lock ( ) . await ;
124
- if token. expires_in ( ) < std:: time:: Duration :: from_secs ( 60 ) {
125
- token
126
- . refresh_token ( & self . client )
127
- . await
128
- . wrap_err ( "couldn't refresh token" ) ?;
129
- }
130
- token
131
- . validate_token ( & client)
132
- . await
133
- . wrap_err ( "couldn't validate token" ) ?;
149
+ refresh_and_validate_token ( & mut token, & client, & self . opts ) . await ?;
134
150
}
135
151
#[ allow( unreachable_code) ]
136
152
Ok ( ( ) )
@@ -140,38 +156,60 @@ impl Bot {
140
156
Ok ( ( ) )
141
157
}
142
158
159
+ /// Handle chat messages, if they start with `!` send it to [Bot::command].
160
+ async fn handle_chat_message (
161
+ & self ,
162
+ token : tokio:: sync:: MutexGuard < ' _ , UserToken > ,
163
+ payload : eventsub:: channel:: ChannelChatMessageV1Payload ,
164
+ subscription : eventsub:: EventSubscriptionInformation <
165
+ eventsub:: channel:: ChannelChatMessageV1 ,
166
+ > ,
167
+ ) -> Result < ( ) , eyre:: Error > {
168
+ if let Some ( command) = payload. message . text . strip_prefix ( "!" ) {
169
+ let mut split_whitespace = command. split_whitespace ( ) ;
170
+ let command = split_whitespace. next ( ) . unwrap ( ) ;
171
+ let rest = split_whitespace. next ( ) ;
172
+
173
+ self . command ( & payload, & subscription, command, rest, & token)
174
+ . await ?;
175
+ }
176
+ Ok ( ( ) )
177
+ }
178
+
179
+ /// Handle all eventsub events.
180
+ /// We print the message to the console and if it's a chat message we send it to [Bot::handle_chat_message].
181
+ /// If there's an event you want to listen to you should first add it to [websocket::ChatWebsocketClient::process_welcome_message] and then handle it here.
143
182
async fn handle_event (
144
183
& self ,
145
184
event : Event ,
146
185
timestamp : twitch_api:: types:: Timestamp ,
147
186
) -> Result < ( ) , eyre:: Report > {
148
187
let token = self . token . lock ( ) . await ;
188
+ let time_format = time:: format_description:: parse ( "[hour]:[minute]:[second]" ) ?;
149
189
match event {
150
190
Event :: ChannelChatMessageV1 ( Payload {
151
191
message : Message :: Notification ( payload) ,
152
192
subscription,
153
193
..
154
194
} ) => {
155
195
println ! (
156
- "[{}] {}: {}" ,
157
- timestamp, payload. chatter_user_name, payload. message. text
196
+ "[{}] #{} {}: {}" ,
197
+ timestamp. to_utc( ) . format( & time_format) . unwrap( ) ,
198
+ payload. broadcaster_user_login,
199
+ payload. chatter_user_name,
200
+ payload. message. text
158
201
) ;
159
- if let Some ( command) = payload. message . text . strip_prefix ( "!" ) {
160
- let mut split_whitespace = command. split_whitespace ( ) ;
161
- let command = split_whitespace. next ( ) . unwrap ( ) ;
162
- let rest = split_whitespace. next ( ) ;
163
-
164
- self . command ( & payload, & subscription, command, rest, & token)
165
- . await ?;
166
- }
202
+
203
+ self . handle_chat_message ( token, payload, subscription)
204
+ . await ?;
167
205
}
168
206
Event :: ChannelChatNotificationV1 ( Payload {
169
207
message : Message :: Notification ( payload) ,
170
208
..
171
209
} ) => {
172
210
println ! (
173
- "[{}] {}: {}" ,
174
- timestamp,
211
+ "[{}] [Event] {}: {}" ,
212
+ timestamp. to_utc ( ) . format ( & time_format ) . unwrap ( ) ,
175
213
match & payload. chatter {
176
214
eventsub:: channel:: chat:: notification:: Chatter :: Chatter {
177
215
chatter_user_name: user,
@@ -206,7 +244,7 @@ impl Bot {
206
244
& payload. message_id ,
207
245
response
208
246
. response
209
- . replace ( "{user}" , & payload. chatter_user_name . as_str ( ) )
247
+ . replace ( "{user}" , payload. chatter_user_name . as_str ( ) )
210
248
. as_str ( ) ,
211
249
token,
212
250
)
@@ -215,3 +253,65 @@ impl Bot {
215
253
Ok ( ( ) )
216
254
}
217
255
}
256
+
257
+ async fn refresh_and_validate_token (
258
+ token : & mut UserToken ,
259
+ client : & HelixClient < ' _ , reqwest:: Client > ,
260
+ opts : & Cli ,
261
+ ) -> Result < ( ) , eyre:: Report > {
262
+ if token. expires_in ( ) < TOKEN_EXPIRATION_THRESHOLD {
263
+ tracing:: info!( "refreshed token" ) ;
264
+ token
265
+ . refresh_token ( client)
266
+ . await
267
+ . wrap_err ( "couldn't refresh token" ) ?;
268
+ save_token ( token, & opts. auth ) ?;
269
+ }
270
+ token
271
+ . validate_token ( client)
272
+ . await
273
+ . wrap_err ( "couldn't validate token" ) ?;
274
+ Ok ( ( ) )
275
+ }
276
+
277
+ /// Used to save the token to a file
278
+ #[ derive( serde_derive:: Serialize , serde_derive:: Deserialize ) ]
279
+ struct SavedToken {
280
+ access_token : twitch_oauth2:: AccessToken ,
281
+ refresh_token : twitch_oauth2:: RefreshToken ,
282
+ }
283
+
284
+ // you should probably replace this with something more robust
285
+ #[ cfg( debug_assertions) ]
286
+ fn save_token (
287
+ token : & twitch_oauth2:: UserToken ,
288
+ save_path : & std:: path:: Path ,
289
+ ) -> Result < ( ) , eyre:: Report > {
290
+ let token = SavedToken {
291
+ access_token : token. access_token . clone ( ) ,
292
+ refresh_token : token. refresh_token . clone ( ) . unwrap ( ) ,
293
+ } ;
294
+ let text = toml:: to_string ( & token) ?;
295
+ std:: fs:: write ( save_path, text) ?;
296
+ Ok ( ( ) )
297
+ }
298
+
299
+ #[ cfg( debug_assertions) ]
300
+ async fn load_token (
301
+ path : & std:: path:: Path ,
302
+ client : & HelixClient < ' _ , reqwest:: Client > ,
303
+ ) -> Result < Option < twitch_oauth2:: UserToken > , eyre:: Report > {
304
+ let Some ( text) = std:: fs:: read_to_string ( path) . ok ( ) else {
305
+ return Ok ( None ) ;
306
+ } ;
307
+ let token: SavedToken = toml:: from_str ( & text) ?;
308
+ Ok ( Some (
309
+ twitch_oauth2:: UserToken :: from_existing (
310
+ client,
311
+ token. access_token ,
312
+ token. refresh_token ,
313
+ None ,
314
+ )
315
+ . await ?,
316
+ ) )
317
+ }
0 commit comments