rustlog/src/main.rs

190 lines
5.0 KiB
Rust

mod app;
mod args;
mod bot;
mod config;
mod db;
mod error;
mod logs;
mod migrator;
mod web;
pub type Result<T> = std::result::Result<T, error::Error>;
pub type ShutdownRx = watch::Receiver<()>;
use anyhow::{anyhow, Context};
use app::App;
use args::{Args, Command};
use clap::Parser;
use config::Config;
use db::{setup_db, writer::create_writer};
use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt};
use migrator::Migrator;
use mimalloc::MiMalloc;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
signal::unix::{signal, SignalKind},
sync::watch,
time::timeout,
};
use tracing::{debug, info};
use tracing_subscriber::EnvFilter;
use twitch_api2::{
twitch_oauth2::{AppAccessToken, Scope},
HelixClient,
};
use twitch_irc::login::StaticLoginCredentials;
use crate::app::cache::UsersCache;
const SHUTDOWN_TIMEOUT_SECONDS: u64 = 8;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.init();
let config = Config::load()?;
let mut db = clickhouse::Client::default()
.with_url(&config.clickhouse_url)
.with_database(&config.clickhouse_db);
if let Some(user) = &config.clickhouse_username {
db = db.with_user(user);
}
if let Some(password) = &config.clickhouse_password {
db = db.with_user(password);
}
let args = Args::parse();
setup_db(&db).await.context("Could not run DB migrations")?;
match args.subcommand {
None => run(config, db).await,
Some(Command::Migrate {
source_dir,
channel_id,
jobs,
}) => migrate(db, source_dir, channel_id, jobs).await,
}
}
async fn run(config: Config, db: clickhouse::Client) -> anyhow::Result<()> {
let mut shutdown_rx = listen_shutdown().await;
let helix_client: HelixClient<reqwest::Client> = HelixClient::default();
let token = generate_token(&config).await?;
let (writer_tx, mut writer_handle) = create_writer(
db.clone(),
shutdown_rx.clone(),
config.clickhouse_flush_interval,
)
.await?;
let app = App {
helix_client,
token: Arc::new(token),
users: UsersCache::default(),
config: Arc::new(config),
db: Arc::new(db),
optout_codes: Arc::default(),
};
let login_credentials = StaticLoginCredentials::anonymous();
let mut bot_handle = tokio::spawn(bot::run(
login_credentials,
app.clone(),
writer_tx,
shutdown_rx.clone(),
));
let mut web_handle = tokio::spawn(web::run(app, shutdown_rx.clone()));
tokio::select! {
_ = shutdown_rx.changed() => {
debug!("Waiting for tasks to shut down");
let started_at = Instant::now();
let shutdown_future = try_join_all([bot_handle, web_handle, writer_handle]);
match timeout(Duration::from_secs(SHUTDOWN_TIMEOUT_SECONDS), shutdown_future).await {
Ok(Ok(_)) => {
debug!("Cleanup finished in {}ms", started_at.elapsed().as_millis());
Ok(())
}
Ok(Err(err)) => Err(anyhow!("Could not shut down properly: {err}")),
Err(_) => {
Err(anyhow!("Tasks did not shut down after {} seconds", SHUTDOWN_TIMEOUT_SECONDS))
}
}
}
_ = &mut bot_handle => {
Err(anyhow!("Bot task exited unexpectedly"))
}
_ = &mut web_handle => {
Err(anyhow!("Web task exited unexpectedly"))
}
_ = &mut writer_handle => {
Err(anyhow!("Writer task exited unexpectedly"))
}
}
}
async fn migrate(
db: clickhouse::Client,
source_logs_path: String,
channel_ids: Vec<String>,
jobs: usize,
) -> anyhow::Result<()> {
let migrator = Migrator::new(db, source_logs_path, channel_ids).await?;
migrator.run(jobs).await
}
async fn generate_token(config: &Config) -> anyhow::Result<AppAccessToken> {
let helix_client: HelixClient<reqwest::Client> = HelixClient::default();
let token = AppAccessToken::get_app_access_token(
&helix_client,
config.client_id.clone().into(),
config.client_secret.clone().into(),
Scope::all(),
)
.await?;
info!("Generated new app token");
Ok(token)
}
async fn listen_shutdown() -> watch::Receiver<()> {
let shutdown_signals = [SignalKind::interrupt(), SignalKind::terminate()];
let mut futures = FuturesUnordered::new();
for signal_kind in shutdown_signals {
let mut listener = signal(signal_kind).unwrap();
futures.push(async move {
listener.recv().await;
signal_kind
});
}
let (tx, rx) = watch::channel(());
tokio::spawn(async move {
futures.next().await;
info!("Received shutdown signal");
tx.send(()).unwrap();
});
rx
}