rustlog/src/bot.rs

320 lines
9.8 KiB
Rust
Raw Normal View History

2023-05-26 13:55:58 -04:00
use crate::{
app::App,
db::schema::Message,
2023-06-07 12:28:47 -04:00
logs::extract::{extract_channel_and_user_from_raw, extract_raw_timestamp},
2023-05-29 14:40:19 -04:00
ShutdownRx,
2023-05-26 13:55:58 -04:00
};
2023-06-04 01:44:55 -04:00
use anyhow::{anyhow, Context};
2023-05-26 13:55:58 -04:00
use chrono::Utc;
2023-05-29 15:35:45 -04:00
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec, IntCounterVec};
2023-06-21 12:35:13 -04:00
use std::{borrow::Cow, time::Duration};
2023-06-22 13:54:01 -04:00
use tokio::{
sync::mpsc::{Receiver, Sender},
time::sleep,
};
2023-06-29 10:04:54 -04:00
use tracing::{debug, error, info, log::warn, trace};
use twitch_irc::{
2022-09-22 16:18:14 -03:00
login::LoginCredentials,
message::{AsRawIRC, IRCMessage, ServerMessage},
ClientConfig, SecureTCPTransport, TwitchIRCClient,
};
2022-08-13 03:38:02 -04:00
2023-06-21 12:35:13 -04:00
const CHANNEL_REJOIN_INTERVAL_SECONDS: u64 = 3600;
const CHANENLS_REFETCH_RETRY_INTERVAL_SECONDS: u64 = 5;
type TwitchClient<C> = TwitchIRCClient<SecureTCPTransport, C>;
2022-08-13 03:38:02 -04:00
2023-06-22 13:54:01 -04:00
#[derive(Debug)]
pub enum BotMessage {
JoinChannels(Vec<String>),
PartChannels(Vec<String>),
}
2023-05-29 15:35:45 -04:00
lazy_static! {
static ref MESSAGES_RECEIVED_COUNTERS: IntCounterVec = register_int_counter_vec!(
"rustlog_messages_received",
"How many messages were written",
&["channel_id"]
)
.unwrap();
}
const COMMAND_PREFIX: &str = "!rustlog ";
2023-05-26 13:55:58 -04:00
pub async fn run<C: LoginCredentials>(
login_credentials: C,
2023-06-22 06:29:41 -04:00
app: App,
2023-05-26 13:55:58 -04:00
writer_tx: Sender<Message<'static>>,
2023-05-29 14:40:19 -04:00
shutdown_rx: ShutdownRx,
2023-06-22 13:54:01 -04:00
command_rx: Receiver<BotMessage>,
2023-05-26 13:55:58 -04:00
) {
let bot = Bot::new(app, writer_tx);
2023-06-22 13:54:01 -04:00
bot.run(login_credentials, shutdown_rx, command_rx).await;
}
2023-06-22 13:54:01 -04:00
#[derive(Clone)]
2023-06-21 12:35:13 -04:00
struct Bot {
2023-06-22 06:29:41 -04:00
app: App,
2023-05-26 13:55:58 -04:00
writer_tx: Sender<Message<'static>>,
}
2023-06-21 12:35:13 -04:00
impl Bot {
2023-06-22 06:29:41 -04:00
pub fn new(app: App, writer_tx: Sender<Message<'static>>) -> Bot {
2023-05-26 13:55:58 -04:00
Self { app, writer_tx }
}
2023-06-22 13:54:01 -04:00
pub async fn run<C: LoginCredentials>(
self,
login_credentials: C,
mut shutdown_rx: ShutdownRx,
mut command_rx: Receiver<BotMessage>,
) {
let client_config = ClientConfig::new_simple(login_credentials);
let (mut receiver, client) = TwitchIRCClient::<SecureTCPTransport, C>::new(client_config);
2023-06-21 12:35:13 -04:00
let app = self.app.clone();
let join_client = client.clone();
tokio::spawn(async move {
loop {
let channel_ids = app.config.channels.read().unwrap().clone();
2023-06-21 12:35:13 -04:00
let interval = match app.get_users(Vec::from_iter(channel_ids), vec![]).await {
Ok(users) => {
info!("Joining {} channels", users.len());
for channel_login in users.into_values() {
debug!("Logging channel {channel_login}");
join_client
.join(channel_login)
.expect("Failed to join channel");
2023-05-29 14:40:19 -04:00
}
2023-06-21 12:35:13 -04:00
CHANNEL_REJOIN_INTERVAL_SECONDS
}
2023-06-21 12:35:13 -04:00
Err(err) => {
error!("Could not fetch users list: {err}");
CHANENLS_REFETCH_RETRY_INTERVAL_SECONDS
}
};
sleep(Duration::from_secs(interval)).await;
}
2023-06-21 12:35:13 -04:00
});
2023-06-22 13:54:01 -04:00
let bot = self.clone();
let msg_client = client.clone();
tokio::spawn(async move {
while let Some(msg) = command_rx.recv().await {
match msg {
BotMessage::JoinChannels(channels) => {
if let Err(err) = bot
.update_channels(
&msg_client,
&channels.iter().map(String::as_str).collect::<Vec<_>>(),
ChannelAction::Join,
)
.await
{
error!("Could not join channels: {err}");
}
}
BotMessage::PartChannels(channels) => {
if let Err(err) = bot
.update_channels(
&msg_client,
&channels.iter().map(String::as_str).collect::<Vec<_>>(),
ChannelAction::Part,
)
.await
{
error!("Could not join channels: {err}");
}
}
}
}
});
2023-06-21 12:35:13 -04:00
loop {
tokio::select! {
Some(msg) = receiver.recv() => {
if let Err(e) = self.handle_message(msg, &client).await {
error!("Could not handle message: {e}");
}
}
_ = shutdown_rx.changed() => {
debug!("Shutting down bot task");
break;
}
2022-08-13 03:38:02 -04:00
}
}
}
2022-08-13 03:38:02 -04:00
async fn handle_message<C: LoginCredentials>(
&self,
msg: ServerMessage,
client: &TwitchClient<C>,
) -> anyhow::Result<()> {
2022-09-07 15:36:13 -04:00
if let ServerMessage::Privmsg(privmsg) = &msg {
trace!("Processing message {}", privmsg.message_text);
if let Some(cmd) = privmsg.message_text.strip_prefix(COMMAND_PREFIX) {
2023-06-29 10:04:54 -04:00
if let Err(err) = self
.handle_command(cmd, client, &privmsg.sender.id, &privmsg.sender.login)
.await
{
warn!("Could not handle command {cmd}: {err:#}");
2022-08-13 03:38:02 -04:00
}
}
}
2022-09-22 16:18:14 -03:00
self.write_message(msg).await?;
Ok(())
}
2023-06-29 10:04:54 -04:00
fn check_admin(&self, user_login: &str) -> anyhow::Result<()> {
if self
.app
.config
.admins
.iter()
.any(|login| login == user_login)
{
Ok(())
} else {
Err(anyhow!("User {user_login} is not an admin"))
}
}
2022-09-22 16:18:14 -03:00
async fn write_message(&self, msg: ServerMessage) -> anyhow::Result<()> {
2023-06-09 15:20:16 -04:00
// Ignore
if matches!(msg, ServerMessage::RoomState(_)) {
return Ok(());
}
2022-09-22 16:18:14 -03:00
let irc_message = IRCMessage::from(msg);
2022-09-22 16:18:14 -03:00
if let Some((channel_id, maybe_user_id)) = extract_channel_and_user_from_raw(&irc_message) {
if !channel_id.is_empty() {
MESSAGES_RECEIVED_COUNTERS
.with_label_values(&[channel_id])
.inc();
}
2023-05-29 15:35:45 -04:00
2023-06-07 12:28:47 -04:00
let timestamp = extract_raw_timestamp(&irc_message)
2023-05-26 13:55:58 -04:00
.unwrap_or_else(|| Utc::now().timestamp_millis().try_into().unwrap());
let user_id = maybe_user_id.unwrap_or_default().to_owned();
2023-06-10 07:50:20 -04:00
if self.app.config.opt_out.contains_key(&user_id) {
return Ok(());
}
2023-05-26 13:55:58 -04:00
let message = Message {
channel_id: Cow::Owned(channel_id.to_owned()),
user_id: Cow::Owned(user_id),
timestamp,
raw: Cow::Owned(irc_message.as_raw_irc()),
};
self.writer_tx.send(message).await?;
2022-08-13 03:38:02 -04:00
}
2022-09-22 16:18:14 -03:00
Ok(())
}
async fn handle_command<C: LoginCredentials>(
&self,
cmd: &str,
client: &TwitchClient<C>,
2023-06-04 01:44:55 -04:00
sender_id: &str,
2023-06-29 10:04:54 -04:00
sender_login: &str,
2022-09-22 16:18:14 -03:00
) -> anyhow::Result<()> {
debug!("Processing command {cmd}");
let mut split = cmd.split_whitespace();
if let Some(action) = split.next() {
let args: Vec<&str> = split.collect();
match action {
"join" => {
2023-06-29 10:04:54 -04:00
self.check_admin(sender_login)?;
self.update_channels(client, &args, ChannelAction::Join)
.await?
}
"leave" | "part" => {
2023-06-29 10:04:54 -04:00
self.check_admin(sender_login)?;
self.update_channels(client, &args, ChannelAction::Part)
.await?
}
2023-06-04 01:44:55 -04:00
"optout" => {
self.optout_user(&args, sender_login, sender_id).await?;
2023-06-04 01:44:55 -04:00
}
_ => (),
}
2022-08-13 03:38:02 -04:00
}
2022-09-22 16:18:14 -03:00
Ok(())
2022-08-13 03:38:02 -04:00
}
async fn optout_user(
&self,
args: &[&str],
sender_login: &str,
sender_id: &str,
) -> anyhow::Result<()> {
let arg = args.first().context("No optout code provided")?;
if self.app.optout_codes.remove(*arg).is_some() {
self.app.optout_user(sender_id).await?;
2023-06-14 01:13:08 -04:00
2023-06-04 01:44:55 -04:00
Ok(())
2023-08-30 15:37:48 -04:00
} else if self.check_admin(sender_login).is_ok() {
let user_id = self.app.get_user_id_by_name(arg).await?;
2023-08-30 15:37:48 -04:00
self.app.optout_user(&user_id).await?;
2023-08-30 15:37:48 -04:00
Ok(())
} else {
Err(anyhow!("Invalid optout code"))
2023-06-04 01:44:55 -04:00
}
}
async fn update_channels<C: LoginCredentials>(
&self,
client: &TwitchClient<C>,
channels: &[&str],
action: ChannelAction,
) -> anyhow::Result<()> {
if channels.is_empty() {
return Err(anyhow!("no channels specified"));
}
let channels = self
.app
2022-09-07 15:36:13 -04:00
.get_users(vec![], channels.iter().map(ToString::to_string).collect())
.await?;
{
let mut config_channels = self.app.config.channels.write().unwrap();
for (channel_id, channel_name) in channels {
match action {
ChannelAction::Join => {
info!("Joining channel {channel_name}");
config_channels.insert(channel_id);
client.join(channel_name)?;
}
ChannelAction::Part => {
info!("Parting channel {channel_name}");
config_channels.remove(&channel_id);
client.part(channel_name);
}
}
}
}
2023-06-07 14:03:01 -04:00
self.app.config.save()?;
Ok(())
}
}
enum ChannelAction {
Join,
Part,
}