Compare commits

...

2 Commits

Author SHA1 Message Date
boring_nick 0b0e45fe96 bump dependencies and clippy fixes 2023-08-30 22:37:48 +03:00
bisspector 8f382b508a
Add query parameters to control date range of requested logs of channels and users. (#15)
* add from & to datetime params to channel logs query

* add from & to datetime params to user logs query
2023-08-30 22:05:43 +03:00
8 changed files with 572 additions and 614 deletions

678
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,36 +5,36 @@ edition = "2021"
[dependencies]
aide = { version = "0.11.0", features = ["axum", "redoc"] }
anyhow = "1.0.71"
axum = { version = "0.6.18", features = ["headers"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.4", features = ["derive"] }
anyhow = "1.0.75"
axum = { version = "0.6.20", features = ["headers"] }
chrono = { version = "0.4.27", features = ["serde"] }
clap = { version = "4.4.1", features = ["derive"] }
clickhouse = { version = "0.11.5", default-features = false, features = [
"lz4",
] }
dashmap = { version = "5.4.0", features = ["serde"] }
flate2 = "1.0.26"
dashmap = { version = "5.5.3", features = ["serde"] }
flate2 = "1.0.27"
futures = "0.3.28"
indexmap = "1.9.3"
lazy_static = "1.4.0"
mimalloc = { version = "0.1.37", default-features = false }
mimalloc = { version = "0.1.38", default-features = false }
mime_guess = "2.0.4"
prometheus = "0.13.3"
rand = "0.8.5"
rayon = "1.7.0"
reqwest = { version = "0.11.18", features = [
reqwest = { version = "0.11.20", features = [
"rustls-tls",
], default-features = false }
rust-embed = { version = "6.7.0", features = ["interpolate-folder-path"] }
schemars = "0.8.12"
serde = { version = "1.0.164", features = ["derive"] }
serde_json = { version = "1.0.97", features = ["preserve_order"] }
serde_repr = "0.1.12"
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] }
schemars = "0.8.13"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = { version = "1.0.105", features = ["preserve_order"] }
serde_repr = "0.1.16"
strum = { version = "0.25.0", features = ["derive"] }
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["sync", "signal", "rt-multi-thread"] }
thiserror = "1.0.47"
tokio = { version = "1.32.0", features = ["sync", "signal", "rt-multi-thread"] }
tokio-stream = "0.1.14"
tower-http = { version = "0.4.0", features = [
tower-http = { version = "0.4.3", features = [
"trace",
"cors",
"normalize-path",
@ -42,7 +42,7 @@ tower-http = { version = "0.4.0", features = [
] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
twitch-irc = { version = "5.0.0", default-features = false, features = [
twitch-irc = { version = "5.0.1", default-features = false, features = [
"metrics-collection",
"transport-tcp-rustls-webpki-roots",
] }
@ -52,9 +52,9 @@ twitch_api2 = { version = "0.6.1", features = [
"twitch_oauth2",
] }
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
axum-prometheus = "0.3.3"
axum-prometheus = "0.4.0"
metrics-prometheus = "0.4.1"
async-trait = "0.1.68"
async-trait = "0.1.73"
[dev-dependencies]
pretty_assertions = "1.3.0"
pretty_assertions = "1.4.0"

View File

@ -261,17 +261,15 @@ impl Bot {
if self.app.optout_codes.remove(*arg).is_some() {
self.app.optout_user(sender_id).await?;
Ok(())
} else if self.check_admin(sender_login).is_ok() {
let user_id = self.app.get_user_id_by_name(arg).await?;
self.app.optout_user(&user_id).await?;
Ok(())
} else {
if self.check_admin(sender_login).is_ok() {
let user_id = self.app.get_user_id_by_name(arg).await?;
self.app.optout_user(&user_id).await?;
Ok(())
} else {
Err(anyhow!("Invalid optout code"))
}
Err(anyhow!("Invalid optout code"))
}
}

View File

@ -6,10 +6,7 @@ pub use migrations::run as setup_db;
use crate::{
error::Error,
logs::{
schema::{ChannelLogDate, UserLogDate},
stream::LogsStream,
},
logs::{schema::LogRangeParams, stream::LogsStream},
web::schema::AvailableLogDate,
Result,
};
@ -21,19 +18,25 @@ use tracing::info;
pub async fn read_channel(
db: &Client,
channel_id: &str,
log_date: ChannelLogDate,
reverse: bool,
limit: Option<u64>,
offset: Option<u64>,
params: &LogRangeParams,
) -> Result<LogsStream> {
let suffix = if reverse { "DESC" } else { "ASC" };
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND toStartOfDay(timestamp) = ? ORDER BY timestamp {suffix}");
apply_limit_offset(&mut query, limit, offset);
let suffix = if params.logs_params.reverse {
"DESC"
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(log_date.to_string())
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}
@ -42,22 +45,27 @@ pub async fn read_user(
db: &Client,
channel_id: &str,
user_id: &str,
log_date: UserLogDate,
reverse: bool,
limit: Option<u64>,
offset: Option<u64>,
params: &LogRangeParams,
) -> Result<LogsStream> {
let suffix = if reverse { "DESC" } else { "ASC" };
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND toStartOfMonth(timestamp) = ? ORDER BY timestamp {suffix}");
apply_limit_offset(&mut query, limit, offset);
let suffix = if params.logs_params.reverse {
"DESC"
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(user_id)
.bind(format!("{}-{:0>2}-1", log_date.year, log_date.month))
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}

View File

@ -1,41 +1,21 @@
pub mod message;
use chrono::{Datelike, NaiveDate, Utc};
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use serde::Deserialize;
#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone, Copy)]
pub struct ChannelLogDate {
pub year: u32,
pub month: u32,
pub day: u32,
}
use crate::web::schema::LogsParams;
impl ChannelLogDate {
pub fn is_today(&self) -> bool {
Some(Utc::now().date_naive())
== NaiveDate::from_ymd_opt(self.year as i32, self.month, self.day)
}
}
impl Display for ChannelLogDate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{:0>2}-{:0>2}", self.year, self.month, self.day)
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct UserLogDate {
pub year: u32,
pub month: u32,
}
impl UserLogDate {
pub fn is_current_month(&self) -> bool {
let current = Utc::now().date_naive();
current.year() as u32 == self.year && current.month() == self.month
}
#[derive(Deserialize, JsonSchema)]
pub struct LogRangeParams {
#[schemars(with = "String")]
/// RFC 3339 start date
pub from: DateTime<Utc>,
#[schemars(with = "String")]
/// RFC 3339 end date
pub to: DateTime<Utc>,
#[serde(flatten)]
pub logs_params: LogsParams,
}
#[derive(Deserialize)]

View File

@ -1,8 +1,9 @@
use super::{
responders::logs::LogsResponse,
schema::{
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsPath, ChannelParam,
ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath, UserParam,
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath,
ChannelParam, ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath,
UserParam,
},
};
use crate::{
@ -12,19 +13,18 @@ use crate::{
read_random_channel_line, read_random_user_line, read_user,
},
error::Error,
logs::{
schema::{ChannelLogDate, UserLogDate},
stream::LogsStream,
},
logs::{schema::LogRangeParams, stream::LogsStream},
web::schema::LogsPathDate,
Result,
};
use aide::axum::IntoApiResponse;
use axum::{
extract::{Path, Query, RawQuery, State},
headers::CacheControl,
response::Redirect,
response::{IntoResponse, Redirect, Response},
Json, TypedHeader,
};
use chrono::{Days, Months, NaiveDate, NaiveTime, Utc};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::time::Duration;
use tracing::debug;
@ -47,46 +47,85 @@ pub async fn get_channels(app: State<App>) -> impl IntoApiResponse {
}
pub async fn get_channel_logs(
Path(LogsPathChannel {
channel_id_type,
channel,
}): Path<LogsPathChannel>,
range_params: Option<Query<LogRangeParams>>,
RawQuery(query): RawQuery,
app: State<App>,
Path(channel_log_params): Path<ChannelLogsPath>,
) -> Result<Response> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
if let Some(Query(params)) = range_params {
let logs = get_channel_logs_inner(&app, &channel_id, params).await?;
Ok(logs.into_response())
} else {
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri).into_response())
}
}
pub async fn get_channel_logs_by_date(
app: State<App>,
Path(channel_log_params): Path<ChannelLogsByDatePath>,
Query(logs_params): Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
debug!("Params: {logs_params:?}");
let channel_id = match channel_log_params.channel_info.channel_id_type {
ChannelIdType::Name => app
.get_users(
vec![],
vec![channel_log_params.channel_info.channel.clone()],
)
.await?
.into_keys()
.next()
.ok_or(Error::NotFound)?,
ChannelIdType::Name => {
app.get_user_id_by_name(&channel_log_params.channel_info.channel)
.await?
}
ChannelIdType::Id => channel_log_params.channel_info.channel.clone(),
};
app.check_opted_out(&channel_id, None)?;
let LogsPathDate { year, month, day } = channel_log_params.date;
let log_date = ChannelLogDate::try_from(channel_log_params.date)?;
debug!("Querying logs for date {log_date:?}");
let from = NaiveDate::from_ymd_opt(year.parse()?, month.parse()?, day.parse()?)
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
.and_time(NaiveTime::default())
.and_utc();
let to = from
.checked_add_days(Days::new(1))
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
let stream = read_channel(
&app.db,
&channel_id,
log_date,
logs_params.reverse,
logs_params.limit,
logs_params.offset,
)
.await?;
let params = LogRangeParams {
from,
to,
logs_params,
};
get_channel_logs_inner(&app, &channel_id, params).await
}
async fn get_channel_logs_inner(
app: &App,
channel_id: &str,
channel_log_params: LogRangeParams,
) -> Result<impl IntoApiResponse> {
app.check_opted_out(channel_id, None)?;
let stream = read_channel(&app.db, channel_id, &channel_log_params).await?;
let logs = LogsResponse {
response_type: logs_params.response_type(),
response_type: channel_log_params.logs_params.response_type(),
stream,
};
let cache = if log_date.is_today() {
let cache = if Utc::now() < channel_log_params.to {
no_cache_header()
} else {
cache_header(36000)
@ -96,71 +135,132 @@ pub async fn get_channel_logs(
}
pub async fn get_user_logs_by_name(
path: Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
query: RawQuery,
app: State<App>,
) -> Result<impl IntoApiResponse> {
get_user_logs(path, range_params, query, false, app).await
}
pub async fn get_user_logs_id(
path: Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
query: RawQuery,
app: State<App>,
) -> Result<impl IntoApiResponse> {
get_user_logs(path, range_params, query, true, app).await
}
async fn get_user_logs(
Path(UserLogPathParams {
channel_id_type,
channel,
user,
}): Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
RawQuery(query): RawQuery,
user_is_id: bool,
app: State<App>,
) -> Result<impl IntoApiResponse> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let user_id = if user_is_id {
user.clone()
} else {
app.get_user_id_by_name(&user).await?
};
if let Some(Query(params)) = range_params {
let logs = get_user_logs_inner(&app, &channel_id, &user_id, params).await?;
Ok(logs.into_response())
} else {
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let user_id_type = if user_is_id { "userid" } else { "user" };
let mut new_uri =
format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri).into_response())
}
}
pub async fn get_user_logs_by_date_name(
app: State<App>,
path: Path<UserLogsPath>,
params: Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
let user_id = app
.get_users(vec![], vec![path.user.clone()])
.await?
.into_iter()
.next()
.ok_or(Error::NotFound)?
.0;
let user_id = app.get_user_id_by_name(&path.user).await?;
get_user_logs(app, path, params, user_id).await
get_user_logs_by_date(app, path, params, user_id).await
}
pub async fn get_user_logs_by_id(
pub async fn get_user_logs_by_date_id(
app: State<App>,
path: Path<UserLogsPath>,
params: Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
let user_id = path.user.clone();
get_user_logs(app, path, params, user_id).await
get_user_logs_by_date(app, path, params, user_id).await
}
async fn get_user_logs(
async fn get_user_logs_by_date(
app: State<App>,
Path(user_logs_path): Path<UserLogsPath>,
Query(logs_params): Query<LogsParams>,
user_id: String,
) -> Result<impl IntoApiResponse> {
let log_date = UserLogDate::try_from(&user_logs_path)?;
let channel_id = match user_logs_path.channel_info.channel_id_type {
ChannelIdType::Name => {
let (id, _) = app
.get_users(vec![], vec![user_logs_path.channel_info.channel])
app.get_user_id_by_name(&user_logs_path.channel_info.channel)
.await?
.into_iter()
.next()
.ok_or(Error::NotFound)?;
id
}
ChannelIdType::Id => user_logs_path.channel_info.channel,
ChannelIdType::Id => user_logs_path.channel_info.channel.clone(),
};
app.check_opted_out(&channel_id, Some(&user_id))?;
let year = user_logs_path.year.parse()?;
let month = user_logs_path.month.parse()?;
let stream = read_user(
&app.db,
&channel_id,
&user_id,
log_date,
logs_params.reverse,
logs_params.limit,
logs_params.offset,
)
.await?;
let from = NaiveDate::from_ymd_opt(year, month, 1)
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
.and_time(NaiveTime::default())
.and_utc();
let to = from
.checked_add_months(Months::new(1))
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
let params = LogRangeParams {
from,
to,
logs_params,
};
get_user_logs_inner(&app, &channel_id, &user_id, params).await
}
async fn get_user_logs_inner(
app: &App,
channel_id: &str,
user_id: &str,
log_params: LogRangeParams,
) -> Result<impl IntoApiResponse> {
app.check_opted_out(channel_id, Some(user_id))?;
let stream = read_user(&app.db, channel_id, user_id, &log_params).await?;
let logs = LogsResponse {
stream,
response_type: logs_params.response_type(),
response_type: log_params.logs_params.response_type(),
};
let cache = if log_date.is_current_month() {
let cache = if Utc::now() < log_params.to {
no_cache_header()
} else {
cache_header(36000)
@ -197,82 +297,6 @@ pub async fn list_available_logs(
}
}
pub async fn redirect_to_latest_channel_logs(
Path(LogsPathChannel {
channel_id_type,
channel,
}): Path<LogsPathChannel>,
RawQuery(query): RawQuery,
app: State<App>,
) -> Result<Redirect> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri))
}
pub async fn redirect_to_latest_user_name_logs(
path: Path<UserLogPathParams>,
query: RawQuery,
app: State<App>,
) -> Result<Redirect> {
redirect_to_latest_user_logs(path, query, false, app).await
}
pub async fn redirect_to_latest_user_id_logs(
path: Path<UserLogPathParams>,
query: RawQuery,
app: State<App>,
) -> Result<Redirect> {
redirect_to_latest_user_logs(path, query, true, app).await
}
async fn redirect_to_latest_user_logs(
Path(UserLogPathParams {
channel_id_type,
channel,
user,
}): Path<UserLogPathParams>,
RawQuery(query): RawQuery,
user_is_id: bool,
app: State<App>,
) -> Result<Redirect> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let user_id = if user_is_id {
user.clone()
} else {
app.get_user_id_by_name(&user).await?
};
app.check_opted_out(&channel_id, Some(&user_id))?;
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let user_id_type = if user_is_id { "userid" } else { "user" };
let mut new_uri = format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri))
}
pub async fn random_channel_line(
app: State<App>,
Path(LogsPathChannel {

View File

@ -77,38 +77,38 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
)
.api_route(
"/:channel_id_type/:channel",
get_with(handlers::redirect_to_latest_channel_logs, |op| {
op.description("Get latest channel logs")
get_with(handlers::get_channel_logs, |op| {
op.description("Get channel logs. If the `to` and `from` query params are not given, redirect to latest available day")
}),
)
// For some reason axum considers it a path overlap if user id type is dynamic
.api_route(
"/:channel_id_type/:channel/user/:user",
get_with(handlers::redirect_to_latest_user_name_logs, |op| {
op.description("Get latest user logs")
get_with(handlers::get_user_logs_by_name, |op| {
op.description("Get user logs by name. If the `to` and `from` query params are not given, redirect to latest available month")
}),
)
.api_route(
"/:channel_id_type/:channel/userid/:user",
get_with(handlers::redirect_to_latest_user_id_logs, |op| {
op.description("Get latest user logs")
get_with(handlers::get_user_logs_id, |op| {
op.description("Get user logs by id. If the `to` and `from` query params are not given, redirect to latest available month")
}),
)
.api_route(
"/:channel_id_type/:channel/:year/:month/:day",
get_with(handlers::get_channel_logs, |op| {
get_with(handlers::get_channel_logs_by_date, |op| {
op.description("Get channel logs from the given day")
}),
)
.api_route(
"/:channel_id_type/:channel/user/:user/:year/:month",
get_with(handlers::get_user_logs_by_name, |op| {
get_with(handlers::get_user_logs_by_date_name, |op| {
op.description("Get user logs in a channel from the given month")
}),
)
.api_route(
"/:channel_id_type/:channel/userid/:user/:year/:month",
get_with(handlers::get_user_logs_by_id, |op| {
get_with(handlers::get_user_logs_by_date_id, |op| {
op.description("Get user logs in a channel from the given month")
}),
)

View File

@ -1,8 +1,7 @@
use super::responders::logs::{JsonResponseType, LogsResponseType};
use crate::logs::schema::{ChannelLogDate, UserLogDate};
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize};
use std::{fmt::Display, num::ParseIntError};
use std::fmt::Display;
#[derive(Serialize, JsonSchema)]
pub struct ChannelsList {
@ -44,32 +43,20 @@ pub struct UserLogsPath {
}
#[derive(Deserialize, JsonSchema)]
pub struct ChannelLogsPath {
pub struct ChannelLogsByDatePath {
#[serde(flatten)]
pub channel_info: LogsPathChannel,
#[serde(flatten)]
pub date: ChannelLogDatePath,
pub date: LogsPathDate,
}
#[derive(Deserialize, JsonSchema)]
pub struct ChannelLogDatePath {
pub struct LogsPathDate {
pub year: String,
pub month: String,
pub day: String,
}
impl TryFrom<ChannelLogDatePath> for ChannelLogDate {
type Error = ParseIntError;
fn try_from(value: ChannelLogDatePath) -> Result<Self, Self::Error> {
Ok(Self {
year: value.year.parse()?,
month: value.month.parse()?,
day: value.day.parse()?,
})
}
}
#[derive(Deserialize, JsonSchema)]
pub struct LogsPathChannel {
pub channel_id_type: ChannelIdType,
@ -116,17 +103,6 @@ where
Ok(Option::<&str>::deserialize(deserializer)?.is_some())
}
impl TryFrom<&UserLogsPath> for UserLogDate {
type Error = ParseIntError;
fn try_from(params: &UserLogsPath) -> Result<Self, Self::Error> {
Ok(Self {
year: params.year.parse()?,
month: params.month.parse()?,
})
}
}
#[derive(Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AvailableLogs {