Compare commits

...

15 Commits

Author SHA1 Message Date
boring_nick 9aef7121fc Merge branch 'master' into supibot 2023-10-26 19:46:23 +03:00
boring_nick 0ede6d3f23 fix: avoid duplicate status code 2023-10-25 12:08:38 +03:00
boring_nick c44c9d8776 distinguish opted out channels and users 2023-10-24 18:49:53 +03:00
boring-nick b160f8452e
Add contributing guide and setup files (#16)
* Add contributing guide and setup files

* set the default db url to localhost

* minor clarifications

* update docs on installing from source

---------

Co-authored-by: boring_nick <boring-nick@users.noreply.github.com>
2023-09-09 15:48:10 +03:00
boring_nick b7a6bfbf7b implement capabilities reporting 2023-09-03 21:24:32 +03:00
boring_nick 26084111f7 cleanup 2023-09-03 16:58:29 +03:00
boring_nick c68e4c1112 split large channel log requests into multiple queries 2023-09-03 16:56:54 +03:00
boring_nick 0b0e45fe96 bump dependencies and clippy fixes 2023-08-30 22:37:48 +03:00
bisspector 8f382b508a
Add query parameters to control date range of requested logs of channels and users. (#15)
* add from & to datetime params to channel logs query

* add from & to datetime params to user logs query
2023-08-30 22:05:43 +03:00
boring_nick 0af071874b FailFish 2023-08-14 09:11:56 +03:00
bisspector 58b66e6be4
feature: admins can now manually opt out users (#12)
* feature: admins can now manually opt out users

* fix: formatting
2023-07-30 17:32:32 +03:00
boring_nick 607cbfb8b9 return proper error when fetching latest logs of a user who opted out 2023-07-22 11:10:54 +03:00
boring_nick d827f540e3 add api key info to admin openapi docs 2023-07-15 21:08:32 +03:00
boring_nick 7d2a72bcde check optout on random line 2023-07-12 20:44:37 +03:00
boring_nick 3dd4fb17b6 make migrator errors more verbose 2023-07-09 22:00:29 +03:00
17 changed files with 850 additions and 643 deletions

686
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,37 +5,37 @@ edition = "2021"
[dependencies]
aide = { version = "0.11.0", features = ["axum", "redoc"] }
anyhow = "1.0.71"
axum = { version = "0.6.18", features = ["headers"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.4", features = ["derive"] }
anyhow = "1.0.75"
axum = { version = "0.6.20", features = ["headers"] }
chrono = { version = "0.4.27", features = ["serde"] }
clap = { version = "4.4.1", features = ["derive"] }
clickhouse = { version = "0.11.5", default-features = false, features = [
"lz4",
] }
dashmap = { version = "5.4.0", features = ["serde"] }
flate2 = "1.0.26"
dashmap = { version = "5.5.3", features = ["serde"] }
flate2 = "1.0.27"
futures = "0.3.28"
indexmap = "1.9.3"
lazy_static = "1.4.0"
mimalloc = { version = "0.1.37", default-features = false }
mimalloc = { version = "0.1.38", default-features = false }
mime_guess = "2.0.4"
prometheus = "0.13.3"
rand = "0.8.5"
rayon = "1.7.0"
reqwest = { version = "0.11.18", features = [
reqwest = { version = "0.11.20", features = [
"rustls-tls",
"json",
], default-features = false }
rust-embed = { version = "6.7.0", features = ["interpolate-folder-path"] }
schemars = "0.8.12"
serde = { version = "1.0.164", features = ["derive"] }
serde_json = { version = "1.0.97", features = ["preserve_order"] }
serde_repr = "0.1.12"
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] }
schemars = "0.8.13"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = { version = "1.0.105", features = ["preserve_order"] }
serde_repr = "0.1.16"
strum = { version = "0.25.0", features = ["derive"] }
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["sync", "signal", "rt-multi-thread"] }
thiserror = "1.0.47"
tokio = { version = "1.32.0", features = ["sync", "signal", "rt-multi-thread"] }
tokio-stream = "0.1.14"
tower-http = { version = "0.4.0", features = [
tower-http = { version = "0.4.3", features = [
"trace",
"cors",
"normalize-path",
@ -43,7 +43,7 @@ tower-http = { version = "0.4.0", features = [
] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
twitch-irc = { version = "5.0.0", default-features = false, features = [
twitch-irc = { version = "5.0.1", default-features = false, features = [
"metrics-collection",
"transport-tcp-rustls-webpki-roots",
] }
@ -53,11 +53,11 @@ twitch_api2 = { version = "0.6.1", features = [
"twitch_oauth2",
] }
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
axum-prometheus = "0.3.3"
axum-prometheus = "0.4.0"
metrics-prometheus = "0.4.1"
csv = "1.2.2"
itertools = "0.11.0"
async-trait = "0.1.68"
async-trait = "0.1.73"
[dev-dependencies]
pretty_assertions = "1.3.0"
pretty_assertions = "1.4.0"

View File

@ -36,9 +36,9 @@ services:
### From source
- Set up Clickhouse
- `cargo install --locked --git https://github.com/boring-nick/rustlog`
- You can now run the `rustlog` binary
- Follow the [Contributing](Contributing) excluding the last step
- `cargo build --release`
- The resulting binary will be at `target/release/rustlog`
## Advantages over justlog
@ -46,5 +46,49 @@ services:
- Blazing fast log queries with response streaming and a [highly performant IRC parser](https://github.com/jprochazk/twitch-rs)
- Support for ndjson logs responses
## Contributing
Requirements:
- rust
- yarn
- docker with docker-compose (optional, will need to set up Clickhouse manually without it)
Steps:
0. Clone the repository (make sure to include submodules!):
```
git clone --recursive https://github.com/boring-nick/rustlog
```
If you already cloned the repo without `--recursive`, you can initialize submodules with:
```
git submodule update --init --recursive
```
1. Set up the database (Clickhouse):
This repository provides a docker-compose to quickly set up Clickhouse. You can use it with:
```
docker-compose -f docker-compose.dev.yml up -d
```
Alternatively, you can install Clickhouse manually using the [official guide](https://clickhouse.com/docs/en/install).
2. Create a config file
Copy `config.dist.json` to `config.json` and configure your database and twitch credentials. If you installed Clickhouse with Docker, the default database configuration works.
3. Build the frontend:
```
cd web
yarn install
yarn build
cd ..
```
4. Build and run rustlog:
```
cargo run
```
You can now access rustlog at http://localhost:8025.
## Migrating from justlog
See [MIGRATION.md](./docs/MIGRATION.md)

12
config.dist.json Normal file
View File

@ -0,0 +1,12 @@
{
"clickhouseUrl": "http://localhost:8123",
"clickhouseDb": "rustlog",
"clickhouseUsername": null,
"clickhousePassword": null,
"listenAddress": "0.0.0.0:8025",
"channels": ["12345"],
"clientID": "id",
"clientSecret": "secret",
"admins": [],
"optOut": {}
}

15
docker-compose.dev.yml Normal file
View File

@ -0,0 +1,15 @@
version: "3.8"
services:
clickhouse:
image: clickhouse/clickhouse-server:latest
volumes:
- "rustlog_ch_data:/var/lib/clickhouse:rw"
environment:
CLICKHOUSE_DB: "rustlog"
ports:
- 8123:8123
- 9000:9000
volumes:
rustlog_ch_data:

View File

@ -1,10 +1,11 @@
pub mod cache;
use self::cache::UsersCache;
use crate::{config::Config, error::Error, Result};
use crate::{config::Config, db::delete_user_logs, error::Error, Result};
use anyhow::Context;
use dashmap::DashSet;
use std::{collections::HashMap, sync::Arc};
use tracing::debug;
use tracing::{debug, info};
use twitch_api2::{helix::users::GetUsersRequest, twitch_oauth2::AppAccessToken, HelixClient};
#[derive(Clone)]
@ -112,14 +113,26 @@ impl App {
}
}
pub async fn optout_user(&self, user_id: &str) -> anyhow::Result<()> {
delete_user_logs(&self.db, user_id)
.await
.context("Could not delete logs")?;
self.config.opt_out.insert(user_id.to_owned(), true);
self.config.save()?;
info!("User {user_id} opted out");
Ok(())
}
pub fn check_opted_out(&self, channel_id: &str, user_id: Option<&str>) -> Result<()> {
if self.config.opt_out.contains_key(channel_id) {
return Err(Error::OptedOut);
return Err(Error::ChannelOptedOut);
}
if let Some(user_id) = user_id {
if self.config.opt_out.contains_key(user_id) {
return Err(Error::OptedOut);
return Err(Error::UserOptedOut);
}
}

View File

@ -1,6 +1,6 @@
use crate::{
app::App,
db::{delete_user_logs, schema::Message},
db::schema::Message,
logs::extract::{extract_channel_and_user_from_raw, extract_raw_timestamp},
ShutdownRx,
};
@ -242,7 +242,7 @@ impl Bot {
.await?
}
"optout" => {
self.optout_user(&args, sender_id).await?;
self.optout_user(&args, sender_login, sender_id).await?;
}
_ => (),
}
@ -251,16 +251,22 @@ impl Bot {
Ok(())
}
async fn optout_user(&self, args: &[&str], sender_id: &str) -> anyhow::Result<()> {
let code = args.first().context("No optout code provided")?;
if self.app.optout_codes.remove(*code).is_some() {
delete_user_logs(&self.app.db, sender_id)
.await
.context("Could not delete logs")?;
async fn optout_user(
&self,
args: &[&str],
sender_login: &str,
sender_id: &str,
) -> anyhow::Result<()> {
let arg = args.first().context("No optout code provided")?;
if self.app.optout_codes.remove(*arg).is_some() {
self.app.optout_user(sender_id).await?;
Ok(())
} else if self.check_admin(sender_login).is_ok() {
let user_id = self.app.get_user_id_by_name(arg).await?;
self.app.optout_user(&user_id).await?;
self.app.config.opt_out.insert(sender_id.to_owned(), true);
self.app.config.save()?;
info!("User {sender_id} opted out");
Ok(())
} else {
Err(anyhow!("Invalid optout code"))

View File

@ -6,58 +6,126 @@ pub use migrations::run as setup_db;
use crate::{
error::Error,
logs::{
schema::{ChannelLogDate, UserLogDate},
stream::LogsStream,
},
logs::{schema::LogRangeParams, stream::LogsStream},
web::schema::AvailableLogDate,
Result,
};
use chrono::{Datelike, NaiveDateTime};
use clickhouse::Client;
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Utc};
use clickhouse::{query::RowCursor, Client};
use rand::{seq::IteratorRandom, thread_rng};
use tracing::info;
use tracing::{debug, info};
const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;
pub async fn read_channel(
db: &Client,
channel_id: &str,
log_date: ChannelLogDate,
reverse: bool,
limit: Option<u64>,
offset: Option<u64>,
params: &LogRangeParams,
) -> Result<LogsStream> {
let suffix = if reverse { "DESC" } else { "ASC" };
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND toStartOfDay(timestamp) = ? ORDER BY timestamp {suffix}");
apply_limit_offset(&mut query, limit, offset);
let suffix = if params.logs_params.reverse {
"DESC"
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
let interval = Duration::days(CHANNEL_MULTI_QUERY_SIZE_DAYS);
if params.to - params.from > interval {
let count = db
.query("SELECT count() FROM (SELECT timestamp FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? LIMIT 1)")
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch_one::<i32>().await?;
if count == 0 {
return Err(Error::NotFound);
}
let mut streams = Vec::with_capacity(1);
let mut current_from = params.from;
let mut current_to = current_from + interval;
loop {
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
streams.push(cursor);
current_from += interval;
current_to += interval;
if current_to > params.to {
let cursor = next_cursor(db, &query, channel_id, current_from, params.to)?;
streams.push(cursor);
break;
}
}
if params.logs_params.reverse {
streams.reverse();
}
debug!("Using {} queries for multi-query stream", streams.len());
LogsStream::new_multi_query(streams)
} else {
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}
}
fn next_cursor(
db: &Client,
query: &str,
channel_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<RowCursor<String>> {
let cursor = db
.query(&query)
.query(query)
.bind(channel_id)
.bind(log_date.to_string())
.bind(from.timestamp_millis() as f64 / 1000.0)
.bind(to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
Ok(cursor)
}
pub async fn read_user(
db: &Client,
channel_id: &str,
user_id: &str,
log_date: UserLogDate,
reverse: bool,
limit: Option<u64>,
offset: Option<u64>,
params: &LogRangeParams,
) -> Result<LogsStream> {
let suffix = if reverse { "DESC" } else { "ASC" };
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND toStartOfMonth(timestamp) = ? ORDER BY timestamp {suffix}");
apply_limit_offset(&mut query, limit, offset);
let suffix = if params.logs_params.reverse {
"DESC"
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(user_id)
.bind(format!("{}-{:0>2}-1", log_date.year, log_date.month))
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}

View File

@ -20,8 +20,10 @@ pub enum Error {
Internal,
#[error("Database error")]
Clickhouse(#[from] clickhouse::error::Error),
#[error("User or channel has opted out")]
OptedOut,
#[error("The requested channel has opted out of being logged")]
ChannelOptedOut,
#[error("The requested user has opted out of being logged")]
UserOptedOut,
#[error("Not found")]
NotFound,
}
@ -35,7 +37,7 @@ impl IntoResponse for Error {
StatusCode::INTERNAL_SERVER_ERROR
}
Error::ParseInt(_) | Error::InvalidParam(_) => StatusCode::BAD_REQUEST,
Error::OptedOut => StatusCode::FORBIDDEN,
Error::ChannelOptedOut | Error::UserOptedOut => StatusCode::FORBIDDEN,
Error::NotFound => StatusCode::NOT_FOUND,
};
@ -82,7 +84,7 @@ impl OperationOutput for Error {
(
Some(403),
aide::openapi::Response {
description: Error::OptedOut.to_string(),
description: "Channel or user has opted out".to_owned(),
..res.clone()
},
),

View File

@ -1,41 +1,21 @@
pub mod message;
use chrono::{Datelike, NaiveDate, Utc};
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use serde::Deserialize;
#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone, Copy)]
pub struct ChannelLogDate {
pub year: u32,
pub month: u32,
pub day: u32,
}
use crate::web::schema::LogsParams;
impl ChannelLogDate {
pub fn is_today(&self) -> bool {
Some(Utc::now().date_naive())
== NaiveDate::from_ymd_opt(self.year as i32, self.month, self.day)
}
}
impl Display for ChannelLogDate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{:0>2}-{:0>2}", self.year, self.month, self.day)
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct UserLogDate {
pub year: u32,
pub month: u32,
}
impl UserLogDate {
pub fn is_current_month(&self) -> bool {
let current = Utc::now().date_naive();
current.year() as u32 == self.year && current.month() == self.month
}
#[derive(Deserialize, JsonSchema)]
pub struct LogRangeParams {
#[schemars(with = "String")]
/// RFC 3339 start date
pub from: DateTime<Utc>,
#[schemars(with = "String")]
/// RFC 3339 end date
pub to: DateTime<Utc>,
#[serde(flatten)]
pub logs_params: LogsParams,
}
#[derive(Deserialize)]

View File

@ -17,6 +17,10 @@ pub enum LogsStream {
cursor: RowCursor<String>,
first_item: Option<String>,
},
MultiQuery {
cursors: Vec<RowCursor<String>>,
current: usize,
},
Provided(Iter<IntoIter<String>>),
}
@ -37,6 +41,17 @@ impl LogsStream {
Ok(Self::Provided(stream::iter(iter)))
}
}
pub fn new_multi_query(cursors: Vec<RowCursor<String>>) -> Result<Self> {
// if streams.is_empty() {
// return Err(Error::NotFound);
// }
Ok(Self::MultiQuery {
cursors,
current: 0,
})
}
}
impl Stream for LogsStream {
@ -55,6 +70,23 @@ impl Stream for LogsStream {
}
}
LogsStream::Provided(iter) => Pin::new(iter).poll_next(cx).map(|item| item.map(Ok)),
LogsStream::MultiQuery { cursors, current } => match cursors.get_mut(*current) {
Some(cursor) => {
let next_line_poll = {
let fut = cursor.next();
pin!(fut);
fut.poll(cx)
};
if let Poll::Ready(Ok(None)) = next_line_poll {
*current += 1;
self.poll_next(cx)
} else {
next_line_poll.map(|result| result.map_err(|err| err.into()).transpose())
}
}
None => Poll::Ready(None),
},
}
}
}

View File

@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
}
if let Some(password) = &config.clickhouse_password {
db = db.with_user(password);
db = db.with_password(password);
}
let args = Args::parse();

View File

@ -122,7 +122,10 @@ impl Migrator {
.unwrap();
let day_bytes = migrator
.migrate_day(&root_path, &channel_id, date, &mut inserter)
.await?;
.await
.with_context(|| {
format!("Could not migrate channel {channel_id} date {date}")
})?;
total_read_bytes.fetch_add(day_bytes as u64, Ordering::SeqCst);
let processed_bytes = total_read_bytes.load(Ordering::SeqCst);
@ -213,10 +216,12 @@ impl Migrator {
) -> anyhow::Result<usize> {
let mut read_bytes = 0;
for line in reader.lines() {
let line = line?;
for (i, line) in reader.lines().enumerate() {
let line = line.with_context(|| format!("Could not read line {i} from input"))?;
read_bytes += line.len() + 1; // Add 1 byte for newline symbol
write_line(channel_id, line, inserter, datetime).await?;
write_line(channel_id, line, inserter, datetime)
.await
.with_context(|| format!("Could not write line {i} to inserter"))?;
}
let stats = inserter.commit().await?;

View File

@ -1,4 +1,10 @@
use crate::{app::App, bot::BotMessage, error::Error};
use aide::{
openapi::{
HeaderStyle, Parameter, ParameterData, ParameterSchemaOrContent, ReferenceOr, SchemaObject,
},
transform::TransformOperation,
};
use axum::{
extract::State,
http::Request,
@ -31,6 +37,31 @@ pub async fn admin_auth<B>(
Err((StatusCode::FORBIDDEN, "No, I don't think so"))
}
pub fn admin_auth_doc(op: &mut TransformOperation) {
let schema = aide::gen::in_context(|ctx| ctx.schema.subschema_for::<String>());
op.inner_mut()
.parameters
.push(ReferenceOr::Item(Parameter::Header {
parameter_data: ParameterData {
name: "X-Api-Key".to_owned(),
description: Some("Configured admin API key".to_owned()),
required: true,
deprecated: None,
format: ParameterSchemaOrContent::Schema(SchemaObject {
json_schema: schema,
external_docs: None,
example: None,
}),
example: None,
examples: Default::default(),
explode: None,
extensions: Default::default(),
},
style: HeaderStyle::Simple,
}));
}
#[derive(Deserialize, JsonSchema)]
pub struct ChannelsRequest {
/// List of channel ids

View File

@ -1,8 +1,9 @@
use super::{
responders::logs::LogsResponse,
schema::{
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsPath, ChannelParam,
ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath, UserParam,
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath,
ChannelParam, ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath,
UserParam,
},
};
use crate::{
@ -12,19 +13,18 @@ use crate::{
read_random_channel_line, read_random_user_line, read_user,
},
error::Error,
logs::{
schema::{ChannelLogDate, UserLogDate},
stream::LogsStream,
},
logs::{schema::LogRangeParams, stream::LogsStream},
web::schema::LogsPathDate,
Result,
};
use aide::axum::IntoApiResponse;
use axum::{
extract::{Path, Query, RawQuery, State},
headers::CacheControl,
response::Redirect,
response::{IntoResponse, Redirect, Response},
Json, TypedHeader,
};
use chrono::{Days, Months, NaiveDate, NaiveTime, Utc};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::time::Duration;
use tracing::debug;
@ -47,46 +47,85 @@ pub async fn get_channels(app: State<App>) -> impl IntoApiResponse {
}
pub async fn get_channel_logs(
Path(LogsPathChannel {
channel_id_type,
channel,
}): Path<LogsPathChannel>,
range_params: Option<Query<LogRangeParams>>,
RawQuery(query): RawQuery,
app: State<App>,
Path(channel_log_params): Path<ChannelLogsPath>,
) -> Result<Response> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
if let Some(Query(params)) = range_params {
let logs = get_channel_logs_inner(&app, &channel_id, params).await?;
Ok(logs.into_response())
} else {
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri).into_response())
}
}
pub async fn get_channel_logs_by_date(
app: State<App>,
Path(channel_log_params): Path<ChannelLogsByDatePath>,
Query(logs_params): Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
debug!("Params: {logs_params:?}");
let channel_id = match channel_log_params.channel_info.channel_id_type {
ChannelIdType::Name => app
.get_users(
vec![],
vec![channel_log_params.channel_info.channel.clone()],
)
.await?
.into_keys()
.next()
.ok_or(Error::NotFound)?,
ChannelIdType::Name => {
app.get_user_id_by_name(&channel_log_params.channel_info.channel)
.await?
}
ChannelIdType::Id => channel_log_params.channel_info.channel.clone(),
};
app.check_opted_out(&channel_id, None)?;
let LogsPathDate { year, month, day } = channel_log_params.date;
let log_date = ChannelLogDate::try_from(channel_log_params.date)?;
debug!("Querying logs for date {log_date:?}");
let from = NaiveDate::from_ymd_opt(year.parse()?, month.parse()?, day.parse()?)
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
.and_time(NaiveTime::default())
.and_utc();
let to = from
.checked_add_days(Days::new(1))
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
let stream = read_channel(
&app.db,
&channel_id,
log_date,
logs_params.reverse,
logs_params.limit,
logs_params.offset,
)
.await?;
let params = LogRangeParams {
from,
to,
logs_params,
};
get_channel_logs_inner(&app, &channel_id, params).await
}
async fn get_channel_logs_inner(
app: &App,
channel_id: &str,
channel_log_params: LogRangeParams,
) -> Result<impl IntoApiResponse> {
app.check_opted_out(channel_id, None)?;
let stream = read_channel(&app.db, channel_id, &channel_log_params).await?;
let logs = LogsResponse {
response_type: logs_params.response_type(),
response_type: channel_log_params.logs_params.response_type(),
stream,
};
let cache = if log_date.is_today() {
let cache = if Utc::now() < channel_log_params.to {
no_cache_header()
} else {
cache_header(36000)
@ -96,71 +135,132 @@ pub async fn get_channel_logs(
}
pub async fn get_user_logs_by_name(
path: Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
query: RawQuery,
app: State<App>,
) -> Result<impl IntoApiResponse> {
get_user_logs(path, range_params, query, false, app).await
}
pub async fn get_user_logs_id(
path: Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
query: RawQuery,
app: State<App>,
) -> Result<impl IntoApiResponse> {
get_user_logs(path, range_params, query, true, app).await
}
async fn get_user_logs(
Path(UserLogPathParams {
channel_id_type,
channel,
user,
}): Path<UserLogPathParams>,
range_params: Option<Query<LogRangeParams>>,
RawQuery(query): RawQuery,
user_is_id: bool,
app: State<App>,
) -> Result<impl IntoApiResponse> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let user_id = if user_is_id {
user.clone()
} else {
app.get_user_id_by_name(&user).await?
};
if let Some(Query(params)) = range_params {
let logs = get_user_logs_inner(&app, &channel_id, &user_id, params).await?;
Ok(logs.into_response())
} else {
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let user_id_type = if user_is_id { "userid" } else { "user" };
let mut new_uri =
format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri).into_response())
}
}
pub async fn get_user_logs_by_date_name(
app: State<App>,
path: Path<UserLogsPath>,
params: Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
let user_id = app
.get_users(vec![], vec![path.user.clone()])
.await?
.into_iter()
.next()
.ok_or(Error::NotFound)?
.0;
let user_id = app.get_user_id_by_name(&path.user).await?;
get_user_logs(app, path, params, user_id).await
get_user_logs_by_date(app, path, params, user_id).await
}
pub async fn get_user_logs_by_id(
pub async fn get_user_logs_by_date_id(
app: State<App>,
path: Path<UserLogsPath>,
params: Query<LogsParams>,
) -> Result<impl IntoApiResponse> {
let user_id = path.user.clone();
get_user_logs(app, path, params, user_id).await
get_user_logs_by_date(app, path, params, user_id).await
}
async fn get_user_logs(
async fn get_user_logs_by_date(
app: State<App>,
Path(user_logs_path): Path<UserLogsPath>,
Query(logs_params): Query<LogsParams>,
user_id: String,
) -> Result<impl IntoApiResponse> {
let log_date = UserLogDate::try_from(&user_logs_path)?;
let channel_id = match user_logs_path.channel_info.channel_id_type {
ChannelIdType::Name => {
let (id, _) = app
.get_users(vec![], vec![user_logs_path.channel_info.channel])
app.get_user_id_by_name(&user_logs_path.channel_info.channel)
.await?
.into_iter()
.next()
.ok_or(Error::NotFound)?;
id
}
ChannelIdType::Id => user_logs_path.channel_info.channel,
ChannelIdType::Id => user_logs_path.channel_info.channel.clone(),
};
app.check_opted_out(&channel_id, Some(&user_id))?;
let year = user_logs_path.year.parse()?;
let month = user_logs_path.month.parse()?;
let stream = read_user(
&app.db,
&channel_id,
&user_id,
log_date,
logs_params.reverse,
logs_params.limit,
logs_params.offset,
)
.await?;
let from = NaiveDate::from_ymd_opt(year, month, 1)
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
.and_time(NaiveTime::default())
.and_utc();
let to = from
.checked_add_months(Months::new(1))
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
let params = LogRangeParams {
from,
to,
logs_params,
};
get_user_logs_inner(&app, &channel_id, &user_id, params).await
}
async fn get_user_logs_inner(
app: &App,
channel_id: &str,
user_id: &str,
log_params: LogRangeParams,
) -> Result<impl IntoApiResponse> {
app.check_opted_out(channel_id, Some(user_id))?;
let stream = read_user(&app.db, channel_id, user_id, &log_params).await?;
let logs = LogsResponse {
stream,
response_type: logs_params.response_type(),
response_type: log_params.logs_params.response_type(),
};
let cache = if log_date.is_current_month() {
let cache = if Utc::now() < log_params.to {
no_cache_header()
} else {
cache_header(36000)
@ -197,80 +297,6 @@ pub async fn list_available_logs(
}
}
pub async fn redirect_to_latest_channel_logs(
Path(LogsPathChannel {
channel_id_type,
channel,
}): Path<LogsPathChannel>,
RawQuery(query): RawQuery,
app: State<App>,
) -> Result<Redirect> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri))
}
pub async fn redirect_to_latest_user_name_logs(
path: Path<UserLogPathParams>,
query: RawQuery,
app: State<App>,
) -> Result<Redirect> {
redirect_to_latest_user_logs(path, query, false, app).await
}
pub async fn redirect_to_latest_user_id_logs(
path: Path<UserLogPathParams>,
query: RawQuery,
app: State<App>,
) -> Result<Redirect> {
redirect_to_latest_user_logs(path, query, true, app).await
}
async fn redirect_to_latest_user_logs(
Path(UserLogPathParams {
channel_id_type,
channel,
user,
}): Path<UserLogPathParams>,
RawQuery(query): RawQuery,
user_is_id: bool,
app: State<App>,
) -> Result<Redirect> {
let channel_id = match channel_id_type {
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
ChannelIdType::Id => channel.clone(),
};
let user_id = if user_is_id {
user.clone()
} else {
app.get_user_id_by_name(&user).await?
};
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
let user_id_type = if user_is_id { "userid" } else { "user" };
let mut new_uri = format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
if let Some(query) = query {
new_uri.push('?');
new_uri.push_str(&query);
}
Ok(Redirect::to(&new_uri))
}
pub async fn random_channel_line(
app: State<App>,
Path(LogsPathChannel {
@ -331,6 +357,8 @@ async fn random_user_line(
ChannelIdType::Id => channel,
};
app.check_opted_out(&channel_id, Some(&user_id))?;
let random_line = read_random_user_line(&app.db, &channel_id, &user_id).await?;
let stream = LogsStream::new_provided(vec![random_line])?;

View File

@ -5,6 +5,7 @@ mod responders;
pub mod schema;
mod trace_layer;
use self::handlers::no_cache_header;
use crate::{app::App, bot::BotMessage, web::admin::admin_auth, ShutdownRx};
use aide::{
axum::{
@ -14,7 +15,12 @@ use aide::{
openapi::OpenApi,
redoc::Redoc,
};
use axum::{middleware, response::IntoResponse, Extension, Json, ServiceExt};
use axum::{
http::Request,
middleware::{self, Next},
response::{IntoResponse, Response},
Extension, Json, ServiceExt,
};
use axum_prometheus::PrometheusMetricLayerBuilder;
use prometheus::TextEncoder;
use std::{
@ -29,7 +35,7 @@ use tower_http::{
};
use tracing::{debug, info};
use self::handlers::no_cache_header;
const CAPABILITIES: &[&str] = &["arbitrary-range-query"];
pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessage>) {
aide::gen::on_error(|error| {
@ -50,10 +56,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
let admin_routes = ApiRouter::new()
.api_route(
"/channels",
post_with(admin::add_channels, |op| {
post_with(admin::add_channels, |mut op| {
admin::admin_auth_doc(&mut op);
op.tag("Admin").description("Join the specified channels")
})
.delete_with(admin::remove_channels, |op| {
.delete_with(admin::remove_channels, |mut op| {
admin::admin_auth_doc(&mut op);
op.tag("Admin").description("Leave the specified channels")
}),
)
@ -76,38 +84,38 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
)
.api_route(
"/:channel_id_type/:channel",
get_with(handlers::redirect_to_latest_channel_logs, |op| {
op.description("Get latest channel logs")
get_with(handlers::get_channel_logs, |op| {
op.description("Get channel logs. If the `to` and `from` query params are not given, redirect to latest available day")
}),
)
// For some reason axum considers it a path overlap if user id type is dynamic
.api_route(
"/:channel_id_type/:channel/user/:user",
get_with(handlers::redirect_to_latest_user_name_logs, |op| {
op.description("Get latest user logs")
get_with(handlers::get_user_logs_by_name, |op| {
op.description("Get user logs by name. If the `to` and `from` query params are not given, redirect to latest available month")
}),
)
.api_route(
"/:channel_id_type/:channel/userid/:user",
get_with(handlers::redirect_to_latest_user_id_logs, |op| {
op.description("Get latest user logs")
get_with(handlers::get_user_logs_id, |op| {
op.description("Get user logs by id. If the `to` and `from` query params are not given, redirect to latest available month")
}),
)
.api_route(
"/:channel_id_type/:channel/:year/:month/:day",
get_with(handlers::get_channel_logs, |op| {
get_with(handlers::get_channel_logs_by_date, |op| {
op.description("Get channel logs from the given day")
}),
)
.api_route(
"/:channel_id_type/:channel/user/:user/:year/:month",
get_with(handlers::get_user_logs_by_name, |op| {
get_with(handlers::get_user_logs_by_date_name, |op| {
op.description("Get user logs in a channel from the given month")
}),
)
.api_route(
"/:channel_id_type/:channel/userid/:user/:year/:month",
get_with(handlers::get_user_logs_by_id, |op| {
get_with(handlers::get_user_logs_by_date_id, |op| {
op.description("Get user logs in a channel from the given month")
}),
)
@ -130,10 +138,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
}),
)
.api_route("/optout", post(handlers::optout))
.api_route("/capabilities", get(capabilities))
.route("/docs", Redoc::new("/openapi.json").axum_route())
.route("/openapi.json", get(serve_openapi))
.route("/assets/*asset", get(frontend::static_asset))
.fallback(frontend::static_asset)
.layer(middleware::from_fn(capabilities_header_middleware))
.layer(
TraceLayer::new_for_http()
.make_span_with(trace_layer::make_span_with)
@ -172,6 +182,19 @@ pub fn parse_listen_addr(addr: &str) -> Result<SocketAddr, AddrParseError> {
}
}
async fn capabilities() -> Json<Vec<&'static str>> {
Json(CAPABILITIES.to_vec())
}
async fn capabilities_header_middleware<B>(request: Request<B>, next: Next<B>) -> Response {
let mut response = next.run(request).await;
response.headers_mut().insert(
"x-rustlog-capabilities",
CAPABILITIES.join(",").try_into().unwrap(),
);
response
}
async fn metrics() -> impl IntoApiResponse {
let metric_families = prometheus::gather();

View File

@ -1,8 +1,7 @@
use super::responders::logs::{JsonResponseType, LogsResponseType};
use crate::logs::schema::{ChannelLogDate, UserLogDate};
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize};
use std::{fmt::Display, num::ParseIntError};
use std::fmt::Display;
#[derive(Serialize, JsonSchema)]
pub struct ChannelsList {
@ -44,32 +43,20 @@ pub struct UserLogsPath {
}
#[derive(Deserialize, JsonSchema)]
pub struct ChannelLogsPath {
pub struct ChannelLogsByDatePath {
#[serde(flatten)]
pub channel_info: LogsPathChannel,
#[serde(flatten)]
pub date: ChannelLogDatePath,
pub date: LogsPathDate,
}
#[derive(Deserialize, JsonSchema)]
pub struct ChannelLogDatePath {
pub struct LogsPathDate {
pub year: String,
pub month: String,
pub day: String,
}
impl TryFrom<ChannelLogDatePath> for ChannelLogDate {
type Error = ParseIntError;
fn try_from(value: ChannelLogDatePath) -> Result<Self, Self::Error> {
Ok(Self {
year: value.year.parse()?,
month: value.month.parse()?,
day: value.day.parse()?,
})
}
}
#[derive(Deserialize, JsonSchema)]
pub struct LogsPathChannel {
pub channel_id_type: ChannelIdType,
@ -116,17 +103,6 @@ where
Ok(Option::<&str>::deserialize(deserializer)?.is_some())
}
impl TryFrom<&UserLogsPath> for UserLogDate {
type Error = ParseIntError;
fn try_from(params: &UserLogsPath) -> Result<Self, Self::Error> {
Ok(Self {
year: params.year.parse()?,
month: params.month.parse()?,
})
}
}
#[derive(Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AvailableLogs {