Compare commits
15 Commits
822f32ae5d
...
9aef7121fc
Author | SHA1 | Date |
---|---|---|
boring_nick | 9aef7121fc | |
boring_nick | 0ede6d3f23 | |
boring_nick | c44c9d8776 | |
boring-nick | b160f8452e | |
boring_nick | b7a6bfbf7b | |
boring_nick | 26084111f7 | |
boring_nick | c68e4c1112 | |
boring_nick | 0b0e45fe96 | |
bisspector | 8f382b508a | |
boring_nick | 0af071874b | |
bisspector | 58b66e6be4 | |
boring_nick | 607cbfb8b9 | |
boring_nick | d827f540e3 | |
boring_nick | 7d2a72bcde | |
boring_nick | 3dd4fb17b6 |
File diff suppressed because it is too large
Load Diff
40
Cargo.toml
40
Cargo.toml
|
@ -5,37 +5,37 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
aide = { version = "0.11.0", features = ["axum", "redoc"] }
|
||||
anyhow = "1.0.71"
|
||||
axum = { version = "0.6.18", features = ["headers"] }
|
||||
chrono = { version = "0.4.26", features = ["serde"] }
|
||||
clap = { version = "4.3.4", features = ["derive"] }
|
||||
anyhow = "1.0.75"
|
||||
axum = { version = "0.6.20", features = ["headers"] }
|
||||
chrono = { version = "0.4.27", features = ["serde"] }
|
||||
clap = { version = "4.4.1", features = ["derive"] }
|
||||
clickhouse = { version = "0.11.5", default-features = false, features = [
|
||||
"lz4",
|
||||
] }
|
||||
dashmap = { version = "5.4.0", features = ["serde"] }
|
||||
flate2 = "1.0.26"
|
||||
dashmap = { version = "5.5.3", features = ["serde"] }
|
||||
flate2 = "1.0.27"
|
||||
futures = "0.3.28"
|
||||
indexmap = "1.9.3"
|
||||
lazy_static = "1.4.0"
|
||||
mimalloc = { version = "0.1.37", default-features = false }
|
||||
mimalloc = { version = "0.1.38", default-features = false }
|
||||
mime_guess = "2.0.4"
|
||||
prometheus = "0.13.3"
|
||||
rand = "0.8.5"
|
||||
rayon = "1.7.0"
|
||||
reqwest = { version = "0.11.18", features = [
|
||||
reqwest = { version = "0.11.20", features = [
|
||||
"rustls-tls",
|
||||
"json",
|
||||
], default-features = false }
|
||||
rust-embed = { version = "6.7.0", features = ["interpolate-folder-path"] }
|
||||
schemars = "0.8.12"
|
||||
serde = { version = "1.0.164", features = ["derive"] }
|
||||
serde_json = { version = "1.0.97", features = ["preserve_order"] }
|
||||
serde_repr = "0.1.12"
|
||||
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] }
|
||||
schemars = "0.8.13"
|
||||
serde = { version = "1.0.188", features = ["derive"] }
|
||||
serde_json = { version = "1.0.105", features = ["preserve_order"] }
|
||||
serde_repr = "0.1.16"
|
||||
strum = { version = "0.25.0", features = ["derive"] }
|
||||
thiserror = "1.0.40"
|
||||
tokio = { version = "1.28.2", features = ["sync", "signal", "rt-multi-thread"] }
|
||||
thiserror = "1.0.47"
|
||||
tokio = { version = "1.32.0", features = ["sync", "signal", "rt-multi-thread"] }
|
||||
tokio-stream = "0.1.14"
|
||||
tower-http = { version = "0.4.0", features = [
|
||||
tower-http = { version = "0.4.3", features = [
|
||||
"trace",
|
||||
"cors",
|
||||
"normalize-path",
|
||||
|
@ -43,7 +43,7 @@ tower-http = { version = "0.4.0", features = [
|
|||
] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||
twitch-irc = { version = "5.0.0", default-features = false, features = [
|
||||
twitch-irc = { version = "5.0.1", default-features = false, features = [
|
||||
"metrics-collection",
|
||||
"transport-tcp-rustls-webpki-roots",
|
||||
] }
|
||||
|
@ -53,11 +53,11 @@ twitch_api2 = { version = "0.6.1", features = [
|
|||
"twitch_oauth2",
|
||||
] }
|
||||
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
|
||||
axum-prometheus = "0.3.3"
|
||||
axum-prometheus = "0.4.0"
|
||||
metrics-prometheus = "0.4.1"
|
||||
csv = "1.2.2"
|
||||
itertools = "0.11.0"
|
||||
async-trait = "0.1.68"
|
||||
async-trait = "0.1.73"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.3.0"
|
||||
pretty_assertions = "1.4.0"
|
||||
|
|
50
README.md
50
README.md
|
@ -36,9 +36,9 @@ services:
|
|||
|
||||
### From source
|
||||
|
||||
- Set up Clickhouse
|
||||
- `cargo install --locked --git https://github.com/boring-nick/rustlog`
|
||||
- You can now run the `rustlog` binary
|
||||
- Follow the [Contributing](Contributing) excluding the last step
|
||||
- `cargo build --release`
|
||||
- The resulting binary will be at `target/release/rustlog`
|
||||
|
||||
## Advantages over justlog
|
||||
|
||||
|
@ -46,5 +46,49 @@ services:
|
|||
- Blazing fast log queries with response streaming and a [highly performant IRC parser](https://github.com/jprochazk/twitch-rs)
|
||||
- Support for ndjson logs responses
|
||||
|
||||
## Contributing
|
||||
|
||||
Requirements:
|
||||
- rust
|
||||
- yarn
|
||||
- docker with docker-compose (optional, will need to set up Clickhouse manually without it)
|
||||
|
||||
Steps:
|
||||
|
||||
0. Clone the repository (make sure to include submodules!):
|
||||
```
|
||||
git clone --recursive https://github.com/boring-nick/rustlog
|
||||
```
|
||||
If you already cloned the repo without `--recursive`, you can initialize submodules with:
|
||||
```
|
||||
git submodule update --init --recursive
|
||||
```
|
||||
|
||||
1. Set up the database (Clickhouse):
|
||||
|
||||
This repository provides a docker-compose to quickly set up Clickhouse. You can use it with:
|
||||
```
|
||||
docker-compose -f docker-compose.dev.yml up -d
|
||||
```
|
||||
Alternatively, you can install Clickhouse manually using the [official guide](https://clickhouse.com/docs/en/install).
|
||||
|
||||
2. Create a config file
|
||||
|
||||
Copy `config.dist.json` to `config.json` and configure your database and twitch credentials. If you installed Clickhouse with Docker, the default database configuration works.
|
||||
|
||||
3. Build the frontend:
|
||||
```
|
||||
cd web
|
||||
yarn install
|
||||
yarn build
|
||||
cd ..
|
||||
```
|
||||
4. Build and run rustlog:
|
||||
```
|
||||
cargo run
|
||||
```
|
||||
|
||||
You can now access rustlog at http://localhost:8025.
|
||||
|
||||
## Migrating from justlog
|
||||
See [MIGRATION.md](./docs/MIGRATION.md)
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"clickhouseUrl": "http://localhost:8123",
|
||||
"clickhouseDb": "rustlog",
|
||||
"clickhouseUsername": null,
|
||||
"clickhousePassword": null,
|
||||
"listenAddress": "0.0.0.0:8025",
|
||||
"channels": ["12345"],
|
||||
"clientID": "id",
|
||||
"clientSecret": "secret",
|
||||
"admins": [],
|
||||
"optOut": {}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
version: "3.8"
|
||||
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
volumes:
|
||||
- "rustlog_ch_data:/var/lib/clickhouse:rw"
|
||||
environment:
|
||||
CLICKHOUSE_DB: "rustlog"
|
||||
ports:
|
||||
- 8123:8123
|
||||
- 9000:9000
|
||||
|
||||
volumes:
|
||||
rustlog_ch_data:
|
|
@ -1,10 +1,11 @@
|
|||
pub mod cache;
|
||||
|
||||
use self::cache::UsersCache;
|
||||
use crate::{config::Config, error::Error, Result};
|
||||
use crate::{config::Config, db::delete_user_logs, error::Error, Result};
|
||||
use anyhow::Context;
|
||||
use dashmap::DashSet;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::debug;
|
||||
use tracing::{debug, info};
|
||||
use twitch_api2::{helix::users::GetUsersRequest, twitch_oauth2::AppAccessToken, HelixClient};
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -112,14 +113,26 @@ impl App {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn optout_user(&self, user_id: &str) -> anyhow::Result<()> {
|
||||
delete_user_logs(&self.db, user_id)
|
||||
.await
|
||||
.context("Could not delete logs")?;
|
||||
|
||||
self.config.opt_out.insert(user_id.to_owned(), true);
|
||||
self.config.save()?;
|
||||
info!("User {user_id} opted out");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn check_opted_out(&self, channel_id: &str, user_id: Option<&str>) -> Result<()> {
|
||||
if self.config.opt_out.contains_key(channel_id) {
|
||||
return Err(Error::OptedOut);
|
||||
return Err(Error::ChannelOptedOut);
|
||||
}
|
||||
|
||||
if let Some(user_id) = user_id {
|
||||
if self.config.opt_out.contains_key(user_id) {
|
||||
return Err(Error::OptedOut);
|
||||
return Err(Error::UserOptedOut);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
28
src/bot.rs
28
src/bot.rs
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
app::App,
|
||||
db::{delete_user_logs, schema::Message},
|
||||
db::schema::Message,
|
||||
logs::extract::{extract_channel_and_user_from_raw, extract_raw_timestamp},
|
||||
ShutdownRx,
|
||||
};
|
||||
|
@ -242,7 +242,7 @@ impl Bot {
|
|||
.await?
|
||||
}
|
||||
"optout" => {
|
||||
self.optout_user(&args, sender_id).await?;
|
||||
self.optout_user(&args, sender_login, sender_id).await?;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
@ -251,16 +251,22 @@ impl Bot {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn optout_user(&self, args: &[&str], sender_id: &str) -> anyhow::Result<()> {
|
||||
let code = args.first().context("No optout code provided")?;
|
||||
if self.app.optout_codes.remove(*code).is_some() {
|
||||
delete_user_logs(&self.app.db, sender_id)
|
||||
.await
|
||||
.context("Could not delete logs")?;
|
||||
async fn optout_user(
|
||||
&self,
|
||||
args: &[&str],
|
||||
sender_login: &str,
|
||||
sender_id: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let arg = args.first().context("No optout code provided")?;
|
||||
if self.app.optout_codes.remove(*arg).is_some() {
|
||||
self.app.optout_user(sender_id).await?;
|
||||
|
||||
Ok(())
|
||||
} else if self.check_admin(sender_login).is_ok() {
|
||||
let user_id = self.app.get_user_id_by_name(arg).await?;
|
||||
|
||||
self.app.optout_user(&user_id).await?;
|
||||
|
||||
self.app.config.opt_out.insert(sender_id.to_owned(), true);
|
||||
self.app.config.save()?;
|
||||
info!("User {sender_id} opted out");
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("Invalid optout code"))
|
||||
|
|
120
src/db/mod.rs
120
src/db/mod.rs
|
@ -6,58 +6,126 @@ pub use migrations::run as setup_db;
|
|||
|
||||
use crate::{
|
||||
error::Error,
|
||||
logs::{
|
||||
schema::{ChannelLogDate, UserLogDate},
|
||||
stream::LogsStream,
|
||||
},
|
||||
logs::{schema::LogRangeParams, stream::LogsStream},
|
||||
web::schema::AvailableLogDate,
|
||||
Result,
|
||||
};
|
||||
use chrono::{Datelike, NaiveDateTime};
|
||||
use clickhouse::Client;
|
||||
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Utc};
|
||||
use clickhouse::{query::RowCursor, Client};
|
||||
use rand::{seq::IteratorRandom, thread_rng};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;
|
||||
|
||||
pub async fn read_channel(
|
||||
db: &Client,
|
||||
channel_id: &str,
|
||||
log_date: ChannelLogDate,
|
||||
reverse: bool,
|
||||
limit: Option<u64>,
|
||||
offset: Option<u64>,
|
||||
params: &LogRangeParams,
|
||||
) -> Result<LogsStream> {
|
||||
let suffix = if reverse { "DESC" } else { "ASC" };
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND toStartOfDay(timestamp) = ? ORDER BY timestamp {suffix}");
|
||||
apply_limit_offset(&mut query, limit, offset);
|
||||
let suffix = if params.logs_params.reverse {
|
||||
"DESC"
|
||||
} else {
|
||||
"ASC"
|
||||
};
|
||||
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
|
||||
|
||||
let interval = Duration::days(CHANNEL_MULTI_QUERY_SIZE_DAYS);
|
||||
if params.to - params.from > interval {
|
||||
let count = db
|
||||
.query("SELECT count() FROM (SELECT timestamp FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? LIMIT 1)")
|
||||
.bind(channel_id)
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch_one::<i32>().await?;
|
||||
if count == 0 {
|
||||
return Err(Error::NotFound);
|
||||
}
|
||||
|
||||
let mut streams = Vec::with_capacity(1);
|
||||
|
||||
let mut current_from = params.from;
|
||||
let mut current_to = current_from + interval;
|
||||
|
||||
loop {
|
||||
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
|
||||
streams.push(cursor);
|
||||
|
||||
current_from += interval;
|
||||
current_to += interval;
|
||||
|
||||
if current_to > params.to {
|
||||
let cursor = next_cursor(db, &query, channel_id, current_from, params.to)?;
|
||||
streams.push(cursor);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if params.logs_params.reverse {
|
||||
streams.reverse();
|
||||
}
|
||||
|
||||
debug!("Using {} queries for multi-query stream", streams.len());
|
||||
|
||||
LogsStream::new_multi_query(streams)
|
||||
} else {
|
||||
apply_limit_offset(
|
||||
&mut query,
|
||||
params.logs_params.limit,
|
||||
params.logs_params.offset,
|
||||
);
|
||||
|
||||
let cursor = db
|
||||
.query(&query)
|
||||
.bind(channel_id)
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch()?;
|
||||
LogsStream::new_cursor(cursor).await
|
||||
}
|
||||
}
|
||||
|
||||
fn next_cursor(
|
||||
db: &Client,
|
||||
query: &str,
|
||||
channel_id: &str,
|
||||
from: DateTime<Utc>,
|
||||
to: DateTime<Utc>,
|
||||
) -> Result<RowCursor<String>> {
|
||||
let cursor = db
|
||||
.query(&query)
|
||||
.query(query)
|
||||
.bind(channel_id)
|
||||
.bind(log_date.to_string())
|
||||
.bind(from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch()?;
|
||||
LogsStream::new_cursor(cursor).await
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
pub async fn read_user(
|
||||
db: &Client,
|
||||
channel_id: &str,
|
||||
user_id: &str,
|
||||
log_date: UserLogDate,
|
||||
reverse: bool,
|
||||
limit: Option<u64>,
|
||||
offset: Option<u64>,
|
||||
params: &LogRangeParams,
|
||||
) -> Result<LogsStream> {
|
||||
let suffix = if reverse { "DESC" } else { "ASC" };
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND toStartOfMonth(timestamp) = ? ORDER BY timestamp {suffix}");
|
||||
apply_limit_offset(&mut query, limit, offset);
|
||||
let suffix = if params.logs_params.reverse {
|
||||
"DESC"
|
||||
} else {
|
||||
"ASC"
|
||||
};
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND user_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
|
||||
apply_limit_offset(
|
||||
&mut query,
|
||||
params.logs_params.limit,
|
||||
params.logs_params.offset,
|
||||
);
|
||||
|
||||
let cursor = db
|
||||
.query(&query)
|
||||
.bind(channel_id)
|
||||
.bind(user_id)
|
||||
.bind(format!("{}-{:0>2}-1", log_date.year, log_date.month))
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch()?;
|
||||
|
||||
LogsStream::new_cursor(cursor).await
|
||||
}
|
||||
|
||||
|
|
10
src/error.rs
10
src/error.rs
|
@ -20,8 +20,10 @@ pub enum Error {
|
|||
Internal,
|
||||
#[error("Database error")]
|
||||
Clickhouse(#[from] clickhouse::error::Error),
|
||||
#[error("User or channel has opted out")]
|
||||
OptedOut,
|
||||
#[error("The requested channel has opted out of being logged")]
|
||||
ChannelOptedOut,
|
||||
#[error("The requested user has opted out of being logged")]
|
||||
UserOptedOut,
|
||||
#[error("Not found")]
|
||||
NotFound,
|
||||
}
|
||||
|
@ -35,7 +37,7 @@ impl IntoResponse for Error {
|
|||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
Error::ParseInt(_) | Error::InvalidParam(_) => StatusCode::BAD_REQUEST,
|
||||
Error::OptedOut => StatusCode::FORBIDDEN,
|
||||
Error::ChannelOptedOut | Error::UserOptedOut => StatusCode::FORBIDDEN,
|
||||
Error::NotFound => StatusCode::NOT_FOUND,
|
||||
};
|
||||
|
||||
|
@ -82,7 +84,7 @@ impl OperationOutput for Error {
|
|||
(
|
||||
Some(403),
|
||||
aide::openapi::Response {
|
||||
description: Error::OptedOut.to_string(),
|
||||
description: "Channel or user has opted out".to_owned(),
|
||||
..res.clone()
|
||||
},
|
||||
),
|
||||
|
|
|
@ -1,41 +1,21 @@
|
|||
pub mod message;
|
||||
|
||||
use chrono::{Datelike, NaiveDate, Utc};
|
||||
use chrono::{DateTime, Utc};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, JsonSchema, Clone, Copy)]
|
||||
pub struct ChannelLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
pub day: u32,
|
||||
}
|
||||
use crate::web::schema::LogsParams;
|
||||
|
||||
impl ChannelLogDate {
|
||||
pub fn is_today(&self) -> bool {
|
||||
Some(Utc::now().date_naive())
|
||||
== NaiveDate::from_ymd_opt(self.year as i32, self.month, self.day)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ChannelLogDate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}-{:0>2}-{:0>2}", self.year, self.month, self.day)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
pub struct UserLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
}
|
||||
|
||||
impl UserLogDate {
|
||||
pub fn is_current_month(&self) -> bool {
|
||||
let current = Utc::now().date_naive();
|
||||
current.year() as u32 == self.year && current.month() == self.month
|
||||
}
|
||||
#[derive(Deserialize, JsonSchema)]
|
||||
pub struct LogRangeParams {
|
||||
#[schemars(with = "String")]
|
||||
/// RFC 3339 start date
|
||||
pub from: DateTime<Utc>,
|
||||
#[schemars(with = "String")]
|
||||
/// RFC 3339 end date
|
||||
pub to: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub logs_params: LogsParams,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
|
|
@ -17,6 +17,10 @@ pub enum LogsStream {
|
|||
cursor: RowCursor<String>,
|
||||
first_item: Option<String>,
|
||||
},
|
||||
MultiQuery {
|
||||
cursors: Vec<RowCursor<String>>,
|
||||
current: usize,
|
||||
},
|
||||
Provided(Iter<IntoIter<String>>),
|
||||
}
|
||||
|
||||
|
@ -37,6 +41,17 @@ impl LogsStream {
|
|||
Ok(Self::Provided(stream::iter(iter)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_multi_query(cursors: Vec<RowCursor<String>>) -> Result<Self> {
|
||||
// if streams.is_empty() {
|
||||
// return Err(Error::NotFound);
|
||||
// }
|
||||
|
||||
Ok(Self::MultiQuery {
|
||||
cursors,
|
||||
current: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LogsStream {
|
||||
|
@ -55,6 +70,23 @@ impl Stream for LogsStream {
|
|||
}
|
||||
}
|
||||
LogsStream::Provided(iter) => Pin::new(iter).poll_next(cx).map(|item| item.map(Ok)),
|
||||
LogsStream::MultiQuery { cursors, current } => match cursors.get_mut(*current) {
|
||||
Some(cursor) => {
|
||||
let next_line_poll = {
|
||||
let fut = cursor.next();
|
||||
pin!(fut);
|
||||
fut.poll(cx)
|
||||
};
|
||||
|
||||
if let Poll::Ready(Ok(None)) = next_line_poll {
|
||||
*current += 1;
|
||||
self.poll_next(cx)
|
||||
} else {
|
||||
next_line_poll.map(|result| result.map_err(|err| err.into()).transpose())
|
||||
}
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
if let Some(password) = &config.clickhouse_password {
|
||||
db = db.with_user(password);
|
||||
db = db.with_password(password);
|
||||
}
|
||||
|
||||
let args = Args::parse();
|
||||
|
|
|
@ -122,7 +122,10 @@ impl Migrator {
|
|||
.unwrap();
|
||||
let day_bytes = migrator
|
||||
.migrate_day(&root_path, &channel_id, date, &mut inserter)
|
||||
.await?;
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Could not migrate channel {channel_id} date {date}")
|
||||
})?;
|
||||
|
||||
total_read_bytes.fetch_add(day_bytes as u64, Ordering::SeqCst);
|
||||
let processed_bytes = total_read_bytes.load(Ordering::SeqCst);
|
||||
|
@ -213,10 +216,12 @@ impl Migrator {
|
|||
) -> anyhow::Result<usize> {
|
||||
let mut read_bytes = 0;
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
for (i, line) in reader.lines().enumerate() {
|
||||
let line = line.with_context(|| format!("Could not read line {i} from input"))?;
|
||||
read_bytes += line.len() + 1; // Add 1 byte for newline symbol
|
||||
write_line(channel_id, line, inserter, datetime).await?;
|
||||
write_line(channel_id, line, inserter, datetime)
|
||||
.await
|
||||
.with_context(|| format!("Could not write line {i} to inserter"))?;
|
||||
}
|
||||
|
||||
let stats = inserter.commit().await?;
|
||||
|
|
|
@ -1,4 +1,10 @@
|
|||
use crate::{app::App, bot::BotMessage, error::Error};
|
||||
use aide::{
|
||||
openapi::{
|
||||
HeaderStyle, Parameter, ParameterData, ParameterSchemaOrContent, ReferenceOr, SchemaObject,
|
||||
},
|
||||
transform::TransformOperation,
|
||||
};
|
||||
use axum::{
|
||||
extract::State,
|
||||
http::Request,
|
||||
|
@ -31,6 +37,31 @@ pub async fn admin_auth<B>(
|
|||
Err((StatusCode::FORBIDDEN, "No, I don't think so"))
|
||||
}
|
||||
|
||||
pub fn admin_auth_doc(op: &mut TransformOperation) {
|
||||
let schema = aide::gen::in_context(|ctx| ctx.schema.subschema_for::<String>());
|
||||
|
||||
op.inner_mut()
|
||||
.parameters
|
||||
.push(ReferenceOr::Item(Parameter::Header {
|
||||
parameter_data: ParameterData {
|
||||
name: "X-Api-Key".to_owned(),
|
||||
description: Some("Configured admin API key".to_owned()),
|
||||
required: true,
|
||||
deprecated: None,
|
||||
format: ParameterSchemaOrContent::Schema(SchemaObject {
|
||||
json_schema: schema,
|
||||
external_docs: None,
|
||||
example: None,
|
||||
}),
|
||||
example: None,
|
||||
examples: Default::default(),
|
||||
explode: None,
|
||||
extensions: Default::default(),
|
||||
},
|
||||
style: HeaderStyle::Simple,
|
||||
}));
|
||||
}
|
||||
|
||||
#[derive(Deserialize, JsonSchema)]
|
||||
pub struct ChannelsRequest {
|
||||
/// List of channel ids
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use super::{
|
||||
responders::logs::LogsResponse,
|
||||
schema::{
|
||||
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsPath, ChannelParam,
|
||||
ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath, UserParam,
|
||||
AvailableLogs, AvailableLogsParams, Channel, ChannelIdType, ChannelLogsByDatePath,
|
||||
ChannelParam, ChannelsList, LogsParams, LogsPathChannel, UserLogPathParams, UserLogsPath,
|
||||
UserParam,
|
||||
},
|
||||
};
|
||||
use crate::{
|
||||
|
@ -12,19 +13,18 @@ use crate::{
|
|||
read_random_channel_line, read_random_user_line, read_user,
|
||||
},
|
||||
error::Error,
|
||||
logs::{
|
||||
schema::{ChannelLogDate, UserLogDate},
|
||||
stream::LogsStream,
|
||||
},
|
||||
logs::{schema::LogRangeParams, stream::LogsStream},
|
||||
web::schema::LogsPathDate,
|
||||
Result,
|
||||
};
|
||||
use aide::axum::IntoApiResponse;
|
||||
use axum::{
|
||||
extract::{Path, Query, RawQuery, State},
|
||||
headers::CacheControl,
|
||||
response::Redirect,
|
||||
response::{IntoResponse, Redirect, Response},
|
||||
Json, TypedHeader,
|
||||
};
|
||||
use chrono::{Days, Months, NaiveDate, NaiveTime, Utc};
|
||||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
use std::time::Duration;
|
||||
use tracing::debug;
|
||||
|
@ -47,46 +47,85 @@ pub async fn get_channels(app: State<App>) -> impl IntoApiResponse {
|
|||
}
|
||||
|
||||
pub async fn get_channel_logs(
|
||||
Path(LogsPathChannel {
|
||||
channel_id_type,
|
||||
channel,
|
||||
}): Path<LogsPathChannel>,
|
||||
range_params: Option<Query<LogRangeParams>>,
|
||||
RawQuery(query): RawQuery,
|
||||
app: State<App>,
|
||||
Path(channel_log_params): Path<ChannelLogsPath>,
|
||||
) -> Result<Response> {
|
||||
let channel_id = match channel_id_type {
|
||||
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
|
||||
ChannelIdType::Id => channel.clone(),
|
||||
};
|
||||
|
||||
if let Some(Query(params)) = range_params {
|
||||
let logs = get_channel_logs_inner(&app, &channel_id, params).await?;
|
||||
Ok(logs.into_response())
|
||||
} else {
|
||||
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
|
||||
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
|
||||
|
||||
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
|
||||
if let Some(query) = query {
|
||||
new_uri.push('?');
|
||||
new_uri.push_str(&query);
|
||||
}
|
||||
|
||||
Ok(Redirect::to(&new_uri).into_response())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_channel_logs_by_date(
|
||||
app: State<App>,
|
||||
Path(channel_log_params): Path<ChannelLogsByDatePath>,
|
||||
Query(logs_params): Query<LogsParams>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
debug!("Params: {logs_params:?}");
|
||||
|
||||
let channel_id = match channel_log_params.channel_info.channel_id_type {
|
||||
ChannelIdType::Name => app
|
||||
.get_users(
|
||||
vec![],
|
||||
vec![channel_log_params.channel_info.channel.clone()],
|
||||
)
|
||||
.await?
|
||||
.into_keys()
|
||||
.next()
|
||||
.ok_or(Error::NotFound)?,
|
||||
ChannelIdType::Name => {
|
||||
app.get_user_id_by_name(&channel_log_params.channel_info.channel)
|
||||
.await?
|
||||
}
|
||||
ChannelIdType::Id => channel_log_params.channel_info.channel.clone(),
|
||||
};
|
||||
|
||||
app.check_opted_out(&channel_id, None)?;
|
||||
let LogsPathDate { year, month, day } = channel_log_params.date;
|
||||
|
||||
let log_date = ChannelLogDate::try_from(channel_log_params.date)?;
|
||||
debug!("Querying logs for date {log_date:?}");
|
||||
let from = NaiveDate::from_ymd_opt(year.parse()?, month.parse()?, day.parse()?)
|
||||
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
|
||||
.and_time(NaiveTime::default())
|
||||
.and_utc();
|
||||
let to = from
|
||||
.checked_add_days(Days::new(1))
|
||||
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
|
||||
|
||||
let stream = read_channel(
|
||||
&app.db,
|
||||
&channel_id,
|
||||
log_date,
|
||||
logs_params.reverse,
|
||||
logs_params.limit,
|
||||
logs_params.offset,
|
||||
)
|
||||
.await?;
|
||||
let params = LogRangeParams {
|
||||
from,
|
||||
to,
|
||||
logs_params,
|
||||
};
|
||||
|
||||
get_channel_logs_inner(&app, &channel_id, params).await
|
||||
}
|
||||
|
||||
async fn get_channel_logs_inner(
|
||||
app: &App,
|
||||
channel_id: &str,
|
||||
channel_log_params: LogRangeParams,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
app.check_opted_out(channel_id, None)?;
|
||||
|
||||
let stream = read_channel(&app.db, channel_id, &channel_log_params).await?;
|
||||
|
||||
let logs = LogsResponse {
|
||||
response_type: logs_params.response_type(),
|
||||
response_type: channel_log_params.logs_params.response_type(),
|
||||
stream,
|
||||
};
|
||||
|
||||
let cache = if log_date.is_today() {
|
||||
let cache = if Utc::now() < channel_log_params.to {
|
||||
no_cache_header()
|
||||
} else {
|
||||
cache_header(36000)
|
||||
|
@ -96,71 +135,132 @@ pub async fn get_channel_logs(
|
|||
}
|
||||
|
||||
pub async fn get_user_logs_by_name(
|
||||
path: Path<UserLogPathParams>,
|
||||
range_params: Option<Query<LogRangeParams>>,
|
||||
query: RawQuery,
|
||||
app: State<App>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
get_user_logs(path, range_params, query, false, app).await
|
||||
}
|
||||
|
||||
pub async fn get_user_logs_id(
|
||||
path: Path<UserLogPathParams>,
|
||||
range_params: Option<Query<LogRangeParams>>,
|
||||
query: RawQuery,
|
||||
app: State<App>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
get_user_logs(path, range_params, query, true, app).await
|
||||
}
|
||||
|
||||
async fn get_user_logs(
|
||||
Path(UserLogPathParams {
|
||||
channel_id_type,
|
||||
channel,
|
||||
user,
|
||||
}): Path<UserLogPathParams>,
|
||||
range_params: Option<Query<LogRangeParams>>,
|
||||
RawQuery(query): RawQuery,
|
||||
user_is_id: bool,
|
||||
app: State<App>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
let channel_id = match channel_id_type {
|
||||
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
|
||||
ChannelIdType::Id => channel.clone(),
|
||||
};
|
||||
let user_id = if user_is_id {
|
||||
user.clone()
|
||||
} else {
|
||||
app.get_user_id_by_name(&user).await?
|
||||
};
|
||||
|
||||
if let Some(Query(params)) = range_params {
|
||||
let logs = get_user_logs_inner(&app, &channel_id, &user_id, params).await?;
|
||||
Ok(logs.into_response())
|
||||
} else {
|
||||
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
|
||||
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
|
||||
|
||||
let user_id_type = if user_is_id { "userid" } else { "user" };
|
||||
|
||||
let mut new_uri =
|
||||
format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
|
||||
if let Some(query) = query {
|
||||
new_uri.push('?');
|
||||
new_uri.push_str(&query);
|
||||
}
|
||||
Ok(Redirect::to(&new_uri).into_response())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_user_logs_by_date_name(
|
||||
app: State<App>,
|
||||
path: Path<UserLogsPath>,
|
||||
params: Query<LogsParams>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
let user_id = app
|
||||
.get_users(vec![], vec![path.user.clone()])
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(Error::NotFound)?
|
||||
.0;
|
||||
let user_id = app.get_user_id_by_name(&path.user).await?;
|
||||
|
||||
get_user_logs(app, path, params, user_id).await
|
||||
get_user_logs_by_date(app, path, params, user_id).await
|
||||
}
|
||||
|
||||
pub async fn get_user_logs_by_id(
|
||||
pub async fn get_user_logs_by_date_id(
|
||||
app: State<App>,
|
||||
path: Path<UserLogsPath>,
|
||||
params: Query<LogsParams>,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
let user_id = path.user.clone();
|
||||
get_user_logs(app, path, params, user_id).await
|
||||
get_user_logs_by_date(app, path, params, user_id).await
|
||||
}
|
||||
|
||||
async fn get_user_logs(
|
||||
async fn get_user_logs_by_date(
|
||||
app: State<App>,
|
||||
Path(user_logs_path): Path<UserLogsPath>,
|
||||
Query(logs_params): Query<LogsParams>,
|
||||
user_id: String,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
let log_date = UserLogDate::try_from(&user_logs_path)?;
|
||||
|
||||
let channel_id = match user_logs_path.channel_info.channel_id_type {
|
||||
ChannelIdType::Name => {
|
||||
let (id, _) = app
|
||||
.get_users(vec![], vec![user_logs_path.channel_info.channel])
|
||||
app.get_user_id_by_name(&user_logs_path.channel_info.channel)
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(Error::NotFound)?;
|
||||
id
|
||||
}
|
||||
|
||||
ChannelIdType::Id => user_logs_path.channel_info.channel,
|
||||
ChannelIdType::Id => user_logs_path.channel_info.channel.clone(),
|
||||
};
|
||||
|
||||
app.check_opted_out(&channel_id, Some(&user_id))?;
|
||||
let year = user_logs_path.year.parse()?;
|
||||
let month = user_logs_path.month.parse()?;
|
||||
|
||||
let stream = read_user(
|
||||
&app.db,
|
||||
&channel_id,
|
||||
&user_id,
|
||||
log_date,
|
||||
logs_params.reverse,
|
||||
logs_params.limit,
|
||||
logs_params.offset,
|
||||
)
|
||||
.await?;
|
||||
let from = NaiveDate::from_ymd_opt(year, month, 1)
|
||||
.ok_or_else(|| Error::InvalidParam("Invalid date".to_owned()))?
|
||||
.and_time(NaiveTime::default())
|
||||
.and_utc();
|
||||
let to = from
|
||||
.checked_add_months(Months::new(1))
|
||||
.ok_or_else(|| Error::InvalidParam("Date out of range".to_owned()))?;
|
||||
|
||||
let params = LogRangeParams {
|
||||
from,
|
||||
to,
|
||||
logs_params,
|
||||
};
|
||||
|
||||
get_user_logs_inner(&app, &channel_id, &user_id, params).await
|
||||
}
|
||||
|
||||
async fn get_user_logs_inner(
|
||||
app: &App,
|
||||
channel_id: &str,
|
||||
user_id: &str,
|
||||
log_params: LogRangeParams,
|
||||
) -> Result<impl IntoApiResponse> {
|
||||
app.check_opted_out(channel_id, Some(user_id))?;
|
||||
|
||||
let stream = read_user(&app.db, channel_id, user_id, &log_params).await?;
|
||||
|
||||
let logs = LogsResponse {
|
||||
stream,
|
||||
response_type: logs_params.response_type(),
|
||||
response_type: log_params.logs_params.response_type(),
|
||||
};
|
||||
|
||||
let cache = if log_date.is_current_month() {
|
||||
let cache = if Utc::now() < log_params.to {
|
||||
no_cache_header()
|
||||
} else {
|
||||
cache_header(36000)
|
||||
|
@ -197,80 +297,6 @@ pub async fn list_available_logs(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn redirect_to_latest_channel_logs(
|
||||
Path(LogsPathChannel {
|
||||
channel_id_type,
|
||||
channel,
|
||||
}): Path<LogsPathChannel>,
|
||||
RawQuery(query): RawQuery,
|
||||
app: State<App>,
|
||||
) -> Result<Redirect> {
|
||||
let channel_id = match channel_id_type {
|
||||
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
|
||||
ChannelIdType::Id => channel.clone(),
|
||||
};
|
||||
|
||||
let available_logs = read_available_channel_logs(&app.db, &channel_id).await?;
|
||||
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
|
||||
|
||||
let mut new_uri = format!("/{channel_id_type}/{channel}/{latest_log}");
|
||||
if let Some(query) = query {
|
||||
new_uri.push('?');
|
||||
new_uri.push_str(&query);
|
||||
}
|
||||
|
||||
Ok(Redirect::to(&new_uri))
|
||||
}
|
||||
|
||||
pub async fn redirect_to_latest_user_name_logs(
|
||||
path: Path<UserLogPathParams>,
|
||||
query: RawQuery,
|
||||
app: State<App>,
|
||||
) -> Result<Redirect> {
|
||||
redirect_to_latest_user_logs(path, query, false, app).await
|
||||
}
|
||||
|
||||
pub async fn redirect_to_latest_user_id_logs(
|
||||
path: Path<UserLogPathParams>,
|
||||
query: RawQuery,
|
||||
app: State<App>,
|
||||
) -> Result<Redirect> {
|
||||
redirect_to_latest_user_logs(path, query, true, app).await
|
||||
}
|
||||
|
||||
async fn redirect_to_latest_user_logs(
|
||||
Path(UserLogPathParams {
|
||||
channel_id_type,
|
||||
channel,
|
||||
user,
|
||||
}): Path<UserLogPathParams>,
|
||||
RawQuery(query): RawQuery,
|
||||
user_is_id: bool,
|
||||
app: State<App>,
|
||||
) -> Result<Redirect> {
|
||||
let channel_id = match channel_id_type {
|
||||
ChannelIdType::Name => app.get_user_id_by_name(&channel).await?,
|
||||
ChannelIdType::Id => channel.clone(),
|
||||
};
|
||||
let user_id = if user_is_id {
|
||||
user.clone()
|
||||
} else {
|
||||
app.get_user_id_by_name(&user).await?
|
||||
};
|
||||
|
||||
let available_logs = read_available_user_logs(&app.db, &channel_id, &user_id).await?;
|
||||
let latest_log = available_logs.first().ok_or(Error::NotFound)?;
|
||||
|
||||
let user_id_type = if user_is_id { "userid" } else { "user" };
|
||||
|
||||
let mut new_uri = format!("/{channel_id_type}/{channel}/{user_id_type}/{user}/{latest_log}");
|
||||
if let Some(query) = query {
|
||||
new_uri.push('?');
|
||||
new_uri.push_str(&query);
|
||||
}
|
||||
Ok(Redirect::to(&new_uri))
|
||||
}
|
||||
|
||||
pub async fn random_channel_line(
|
||||
app: State<App>,
|
||||
Path(LogsPathChannel {
|
||||
|
@ -331,6 +357,8 @@ async fn random_user_line(
|
|||
ChannelIdType::Id => channel,
|
||||
};
|
||||
|
||||
app.check_opted_out(&channel_id, Some(&user_id))?;
|
||||
|
||||
let random_line = read_random_user_line(&app.db, &channel_id, &user_id).await?;
|
||||
let stream = LogsStream::new_provided(vec![random_line])?;
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ mod responders;
|
|||
pub mod schema;
|
||||
mod trace_layer;
|
||||
|
||||
use self::handlers::no_cache_header;
|
||||
use crate::{app::App, bot::BotMessage, web::admin::admin_auth, ShutdownRx};
|
||||
use aide::{
|
||||
axum::{
|
||||
|
@ -14,7 +15,12 @@ use aide::{
|
|||
openapi::OpenApi,
|
||||
redoc::Redoc,
|
||||
};
|
||||
use axum::{middleware, response::IntoResponse, Extension, Json, ServiceExt};
|
||||
use axum::{
|
||||
http::Request,
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
Extension, Json, ServiceExt,
|
||||
};
|
||||
use axum_prometheus::PrometheusMetricLayerBuilder;
|
||||
use prometheus::TextEncoder;
|
||||
use std::{
|
||||
|
@ -29,7 +35,7 @@ use tower_http::{
|
|||
};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use self::handlers::no_cache_header;
|
||||
const CAPABILITIES: &[&str] = &["arbitrary-range-query"];
|
||||
|
||||
pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessage>) {
|
||||
aide::gen::on_error(|error| {
|
||||
|
@ -50,10 +56,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
|
|||
let admin_routes = ApiRouter::new()
|
||||
.api_route(
|
||||
"/channels",
|
||||
post_with(admin::add_channels, |op| {
|
||||
post_with(admin::add_channels, |mut op| {
|
||||
admin::admin_auth_doc(&mut op);
|
||||
op.tag("Admin").description("Join the specified channels")
|
||||
})
|
||||
.delete_with(admin::remove_channels, |op| {
|
||||
.delete_with(admin::remove_channels, |mut op| {
|
||||
admin::admin_auth_doc(&mut op);
|
||||
op.tag("Admin").description("Leave the specified channels")
|
||||
}),
|
||||
)
|
||||
|
@ -76,38 +84,38 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
|
|||
)
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel",
|
||||
get_with(handlers::redirect_to_latest_channel_logs, |op| {
|
||||
op.description("Get latest channel logs")
|
||||
get_with(handlers::get_channel_logs, |op| {
|
||||
op.description("Get channel logs. If the `to` and `from` query params are not given, redirect to latest available day")
|
||||
}),
|
||||
)
|
||||
// For some reason axum considers it a path overlap if user id type is dynamic
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel/user/:user",
|
||||
get_with(handlers::redirect_to_latest_user_name_logs, |op| {
|
||||
op.description("Get latest user logs")
|
||||
get_with(handlers::get_user_logs_by_name, |op| {
|
||||
op.description("Get user logs by name. If the `to` and `from` query params are not given, redirect to latest available month")
|
||||
}),
|
||||
)
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel/userid/:user",
|
||||
get_with(handlers::redirect_to_latest_user_id_logs, |op| {
|
||||
op.description("Get latest user logs")
|
||||
get_with(handlers::get_user_logs_id, |op| {
|
||||
op.description("Get user logs by id. If the `to` and `from` query params are not given, redirect to latest available month")
|
||||
}),
|
||||
)
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel/:year/:month/:day",
|
||||
get_with(handlers::get_channel_logs, |op| {
|
||||
get_with(handlers::get_channel_logs_by_date, |op| {
|
||||
op.description("Get channel logs from the given day")
|
||||
}),
|
||||
)
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel/user/:user/:year/:month",
|
||||
get_with(handlers::get_user_logs_by_name, |op| {
|
||||
get_with(handlers::get_user_logs_by_date_name, |op| {
|
||||
op.description("Get user logs in a channel from the given month")
|
||||
}),
|
||||
)
|
||||
.api_route(
|
||||
"/:channel_id_type/:channel/userid/:user/:year/:month",
|
||||
get_with(handlers::get_user_logs_by_id, |op| {
|
||||
get_with(handlers::get_user_logs_by_date_id, |op| {
|
||||
op.description("Get user logs in a channel from the given month")
|
||||
}),
|
||||
)
|
||||
|
@ -130,10 +138,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
|
|||
}),
|
||||
)
|
||||
.api_route("/optout", post(handlers::optout))
|
||||
.api_route("/capabilities", get(capabilities))
|
||||
.route("/docs", Redoc::new("/openapi.json").axum_route())
|
||||
.route("/openapi.json", get(serve_openapi))
|
||||
.route("/assets/*asset", get(frontend::static_asset))
|
||||
.fallback(frontend::static_asset)
|
||||
.layer(middleware::from_fn(capabilities_header_middleware))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(trace_layer::make_span_with)
|
||||
|
@ -172,6 +182,19 @@ pub fn parse_listen_addr(addr: &str) -> Result<SocketAddr, AddrParseError> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn capabilities() -> Json<Vec<&'static str>> {
|
||||
Json(CAPABILITIES.to_vec())
|
||||
}
|
||||
|
||||
async fn capabilities_header_middleware<B>(request: Request<B>, next: Next<B>) -> Response {
|
||||
let mut response = next.run(request).await;
|
||||
response.headers_mut().insert(
|
||||
"x-rustlog-capabilities",
|
||||
CAPABILITIES.join(",").try_into().unwrap(),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
async fn metrics() -> impl IntoApiResponse {
|
||||
let metric_families = prometheus::gather();
|
||||
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
use super::responders::logs::{JsonResponseType, LogsResponseType};
|
||||
use crate::logs::schema::{ChannelLogDate, UserLogDate};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use std::{fmt::Display, num::ParseIntError};
|
||||
use std::fmt::Display;
|
||||
|
||||
#[derive(Serialize, JsonSchema)]
|
||||
pub struct ChannelsList {
|
||||
|
@ -44,32 +43,20 @@ pub struct UserLogsPath {
|
|||
}
|
||||
|
||||
#[derive(Deserialize, JsonSchema)]
|
||||
pub struct ChannelLogsPath {
|
||||
pub struct ChannelLogsByDatePath {
|
||||
#[serde(flatten)]
|
||||
pub channel_info: LogsPathChannel,
|
||||
#[serde(flatten)]
|
||||
pub date: ChannelLogDatePath,
|
||||
pub date: LogsPathDate,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, JsonSchema)]
|
||||
pub struct ChannelLogDatePath {
|
||||
pub struct LogsPathDate {
|
||||
pub year: String,
|
||||
pub month: String,
|
||||
pub day: String,
|
||||
}
|
||||
|
||||
impl TryFrom<ChannelLogDatePath> for ChannelLogDate {
|
||||
type Error = ParseIntError;
|
||||
|
||||
fn try_from(value: ChannelLogDatePath) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
year: value.year.parse()?,
|
||||
month: value.month.parse()?,
|
||||
day: value.day.parse()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, JsonSchema)]
|
||||
pub struct LogsPathChannel {
|
||||
pub channel_id_type: ChannelIdType,
|
||||
|
@ -116,17 +103,6 @@ where
|
|||
Ok(Option::<&str>::deserialize(deserializer)?.is_some())
|
||||
}
|
||||
|
||||
impl TryFrom<&UserLogsPath> for UserLogDate {
|
||||
type Error = ParseIntError;
|
||||
|
||||
fn try_from(params: &UserLogsPath) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
year: params.year.parse()?,
|
||||
month: params.month.parse()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AvailableLogs {
|
||||
|
|
Loading…
Reference in New Issue