From 35ac15dd846a6988090bd8dac8eed9f0598f0ed7 Mon Sep 17 00:00:00 2001 From: boring_nick Date: Thu, 22 Jun 2023 20:54:01 +0300 Subject: [PATCH] implement admin api --- docs/CONFIG.md | 6 +++-- src/bot.rs | 55 ++++++++++++++++++++++++++++++++++++++--- src/config.rs | 2 ++ src/main.rs | 7 ++++-- src/web/admin.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ src/web/mod.rs | 24 +++++++++++++++--- 6 files changed, 147 insertions(+), 11 deletions(-) create mode 100644 src/web/admin.rs diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 50ac435..eba60d7 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -14,6 +14,7 @@ Available options: - `clientSecret` (string): Twitch client secret. - `admins` (array of strings): List of usernames who are allowed to use administration commands. - `optOut` (object of strings: booleans): List of user ids who opted out from being logged. +- `adminAPIKey` (string): API key for admin requests Example config: ```json @@ -27,6 +28,7 @@ Example config: "clientID": "id", "clientSecret": "secret", "admins": [], - "optOut": {} + "optOut": {}, + "adminAPIKey": "verysecurekey" } -``` \ No newline at end of file +``` diff --git a/src/bot.rs b/src/bot.rs index c0470aa..ec5eee0 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -9,7 +9,10 @@ use chrono::Utc; use lazy_static::lazy_static; use prometheus::{register_int_counter_vec, IntCounterVec}; use std::{borrow::Cow, time::Duration}; -use tokio::{sync::mpsc::Sender, time::sleep}; +use tokio::{ + sync::mpsc::{Receiver, Sender}, + time::sleep, +}; use tracing::{debug, error, info, trace}; use twitch_irc::{ login::LoginCredentials, @@ -22,6 +25,12 @@ const CHANENLS_REFETCH_RETRY_INTERVAL_SECONDS: u64 = 5; type TwitchClient = TwitchIRCClient; +#[derive(Debug)] +pub enum BotMessage { + JoinChannels(Vec), + PartChannels(Vec), +} + lazy_static! { static ref MESSAGES_RECEIVED_COUNTERS: IntCounterVec = register_int_counter_vec!( "rustlog_messages_received", @@ -38,11 +47,13 @@ pub async fn run( app: App, writer_tx: Sender>, shutdown_rx: ShutdownRx, + command_rx: Receiver, ) { let bot = Bot::new(app, writer_tx); - bot.run(login_credentials, shutdown_rx).await; + bot.run(login_credentials, shutdown_rx, command_rx).await; } +#[derive(Clone)] struct Bot { app: App, writer_tx: Sender>, @@ -53,7 +64,12 @@ impl Bot { Self { app, writer_tx } } - pub async fn run(self, login_credentials: C, mut shutdown_rx: ShutdownRx) { + pub async fn run( + self, + login_credentials: C, + mut shutdown_rx: ShutdownRx, + mut command_rx: Receiver, + ) { let client_config = ClientConfig::new_simple(login_credentials); let (mut receiver, client) = TwitchIRCClient::::new(client_config); @@ -83,6 +99,39 @@ impl Bot { } }); + let bot = self.clone(); + let msg_client = client.clone(); + tokio::spawn(async move { + while let Some(msg) = command_rx.recv().await { + match msg { + BotMessage::JoinChannels(channels) => { + if let Err(err) = bot + .update_channels( + &msg_client, + &channels.iter().map(String::as_str).collect::>(), + ChannelAction::Join, + ) + .await + { + error!("Could not join channels: {err}"); + } + } + BotMessage::PartChannels(channels) => { + if let Err(err) = bot + .update_channels( + &msg_client, + &channels.iter().map(String::as_str).collect::>(), + ChannelAction::Part, + ) + .await + { + error!("Could not join channels: {err}"); + } + } + } + } + }); + loop { tokio::select! { Some(msg) = receiver.recv() => { diff --git a/src/config.rs b/src/config.rs index c972dad..4d8b890 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,8 @@ pub struct Config { pub admins: Vec, #[serde(default)] pub opt_out: DashMap, + #[serde(rename = "adminAPIKey")] + pub admin_api_key: Option, } impl Config { diff --git a/src/main.rs b/src/main.rs index 818f34c..c867012 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use std::{ }; use tokio::{ signal::unix::{signal, SignalKind}, - sync::watch, + sync::{mpsc, watch}, time::timeout, }; use tracing::{debug, info}; @@ -101,14 +101,17 @@ async fn run(config: Config, db: clickhouse::Client) -> anyhow::Result<()> { optout_codes: Arc::default(), }; + let (bot_tx, bot_rx) = mpsc::channel(1); + let login_credentials = StaticLoginCredentials::anonymous(); let mut bot_handle = tokio::spawn(bot::run( login_credentials, app.clone(), writer_tx, shutdown_rx.clone(), + bot_rx, )); - let mut web_handle = tokio::spawn(web::run(app, shutdown_rx.clone())); + let mut web_handle = tokio::spawn(web::run(app, shutdown_rx.clone(), bot_tx)); tokio::select! { _ = shutdown_rx.changed() => { diff --git a/src/web/admin.rs b/src/web/admin.rs new file mode 100644 index 0000000..7f9e62a --- /dev/null +++ b/src/web/admin.rs @@ -0,0 +1,64 @@ +use crate::{app::App, bot::BotMessage, error::Error}; +use axum::{ + extract::State, + http::Request, + middleware::Next, + response::{IntoResponse, Response}, + Extension, Json, +}; +use reqwest::StatusCode; +use schemars::JsonSchema; +use serde::Deserialize; +use tokio::sync::mpsc::Sender; + +pub async fn admin_auth( + app: State, + request: Request, + next: Next, +) -> Result { + if let Some(admin_key) = &app.config.admin_api_key { + if request + .headers() + .get("X-Api-Key") + .and_then(|value| value.to_str().ok()) + == Some(admin_key) + { + let response = next.run(request).await; + return Ok(response); + } + } + + Err((StatusCode::FORBIDDEN, "No, I don't think so")) +} + +#[derive(Deserialize, JsonSchema)] +pub struct ChannelsRequest { + /// List of channel ids + pub channels: Vec, +} + +pub async fn add_channels( + Extension(bot_tx): Extension>, + app: State, + Json(ChannelsRequest { channels }): Json, +) -> Result<(), Error> { + let users = app.get_users(channels, vec![]).await?; + let names = users.into_values().collect(); + + bot_tx.send(BotMessage::JoinChannels(names)).await.unwrap(); + + Ok(()) +} + +pub async fn remove_channels( + Extension(bot_tx): Extension>, + app: State, + Json(ChannelsRequest { channels }): Json, +) -> Result<(), Error> { + let users = app.get_users(channels, vec![]).await?; + let names = users.into_values().collect(); + + bot_tx.send(BotMessage::PartChannels(names)).await.unwrap(); + + Ok(()) +} diff --git a/src/web/mod.rs b/src/web/mod.rs index b84fbe8..d27b089 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -1,29 +1,31 @@ +mod admin; mod frontend; mod handlers; mod responders; pub mod schema; mod trace_layer; -use crate::{app::App, ShutdownRx}; +use crate::{app::App, bot::BotMessage, web::admin::admin_auth, ShutdownRx}; use aide::{ axum::{ - routing::{get, get_with, post}, + routing::{get, get_with, post, post_with}, ApiRouter, IntoApiResponse, }, openapi::OpenApi, redoc::Redoc, }; -use axum::{response::IntoResponse, Extension, Json, ServiceExt}; +use axum::{middleware, response::IntoResponse, Extension, Json, ServiceExt}; use prometheus::TextEncoder; use std::{ net::{AddrParseError, SocketAddr}, str::FromStr, sync::Arc, }; +use tokio::sync::mpsc::Sender; use tower_http::{cors::CorsLayer, normalize_path::NormalizePath, trace::TraceLayer}; use tracing::{debug, info}; -pub async fn run(app: App, mut shutdown_rx: ShutdownRx) { +pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender) { aide::gen::on_error(|error| { panic!("Could not generate docs: {error}"); }); @@ -37,7 +39,21 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx) { let mut api = OpenApi::default(); + let admin_routes = ApiRouter::new() + .api_route( + "/channels", + post_with(admin::add_channels, |op| { + op.tag("Admin").description("Join the specified channels") + }) + .delete_with(admin::remove_channels, |op| { + op.tag("Admin").description("Leave the specified channels") + }), + ) + .route_layer(middleware::from_fn_with_state(app.clone(), admin_auth)) + .layer(Extension(bot_tx)); + let app = ApiRouter::new() + .nest("/admin", admin_routes) .api_route( "/channels", get_with(handlers::get_channels, |op| {