Compare commits

...

3 Commits

Author SHA1 Message Date
boring_nick b7a6bfbf7b implement capabilities reporting 2023-09-03 21:24:32 +03:00
boring_nick 26084111f7 cleanup 2023-09-03 16:58:29 +03:00
boring_nick c68e4c1112 split large channel log requests into multiple queries 2023-09-03 16:56:54 +03:00
3 changed files with 128 additions and 14 deletions

View File

@ -10,10 +10,12 @@ use crate::{
web::schema::AvailableLogDate,
Result,
};
use chrono::{Datelike, NaiveDateTime};
use clickhouse::Client;
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Utc};
use clickhouse::{query::RowCursor, Client};
use rand::{seq::IteratorRandom, thread_rng};
use tracing::info;
use tracing::{debug, info};
const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;
pub async fn read_channel(
db: &Client,
@ -25,20 +27,78 @@ pub async fn read_channel(
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
let interval = Duration::days(CHANNEL_MULTI_QUERY_SIZE_DAYS);
if params.to - params.from > interval {
let count = db
.query("SELECT count() FROM (SELECT timestamp FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? LIMIT 1)")
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch_one::<i32>().await?;
if count == 0 {
return Err(Error::NotFound);
}
let mut streams = Vec::with_capacity(1);
let mut current_from = params.from;
let mut current_to = current_from + interval;
loop {
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
streams.push(cursor);
current_from += interval;
current_to += interval;
if current_to > params.to {
let cursor = next_cursor(db, &query, channel_id, current_from, params.to)?;
streams.push(cursor);
break;
}
}
if params.logs_params.reverse {
streams.reverse();
}
debug!("Using {} queries for multi-query stream", streams.len());
LogsStream::new_multi_query(streams)
} else {
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}
}
fn next_cursor(
db: &Client,
query: &str,
channel_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<RowCursor<String>> {
let cursor = db
.query(&query)
.query(query)
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.bind(from.timestamp_millis() as f64 / 1000.0)
.bind(to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
Ok(cursor)
}
pub async fn read_user(

View File

@ -17,6 +17,10 @@ pub enum LogsStream {
cursor: RowCursor<String>,
first_item: Option<String>,
},
MultiQuery {
cursors: Vec<RowCursor<String>>,
current: usize,
},
Provided(Iter<IntoIter<String>>),
}
@ -37,6 +41,17 @@ impl LogsStream {
Ok(Self::Provided(stream::iter(iter)))
}
}
pub fn new_multi_query(cursors: Vec<RowCursor<String>>) -> Result<Self> {
// if streams.is_empty() {
// return Err(Error::NotFound);
// }
Ok(Self::MultiQuery {
cursors,
current: 0,
})
}
}
impl Stream for LogsStream {
@ -55,6 +70,23 @@ impl Stream for LogsStream {
}
}
LogsStream::Provided(iter) => Pin::new(iter).poll_next(cx).map(|item| item.map(Ok)),
LogsStream::MultiQuery { cursors, current } => match cursors.get_mut(*current) {
Some(cursor) => {
let next_line_poll = {
let fut = cursor.next();
pin!(fut);
fut.poll(cx)
};
if let Poll::Ready(Ok(None)) = next_line_poll {
*current += 1;
self.poll_next(cx)
} else {
next_line_poll.map(|result| result.map_err(|err| err.into()).transpose())
}
}
None => Poll::Ready(None),
},
}
}
}

View File

@ -15,7 +15,12 @@ use aide::{
openapi::OpenApi,
redoc::Redoc,
};
use axum::{middleware, response::IntoResponse, Extension, Json, ServiceExt};
use axum::{
http::Request,
middleware::{self, Next},
response::{IntoResponse, Response},
Extension, Json, ServiceExt,
};
use axum_prometheus::PrometheusMetricLayerBuilder;
use prometheus::TextEncoder;
use std::{
@ -30,6 +35,8 @@ use tower_http::{
};
use tracing::{debug, info};
const CAPABILITIES: &[&str] = &["arbitrary-range-query"];
pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessage>) {
aide::gen::on_error(|error| {
panic!("Could not generate docs: {error}");
@ -131,10 +138,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
}),
)
.api_route("/optout", post(handlers::optout))
.api_route("/capabilities", get(capabilities))
.route("/docs", Redoc::new("/openapi.json").axum_route())
.route("/openapi.json", get(serve_openapi))
.route("/assets/*asset", get(frontend::static_asset))
.fallback(frontend::static_asset)
.layer(middleware::from_fn(capabilities_header_middleware))
.layer(
TraceLayer::new_for_http()
.make_span_with(trace_layer::make_span_with)
@ -173,6 +182,19 @@ pub fn parse_listen_addr(addr: &str) -> Result<SocketAddr, AddrParseError> {
}
}
async fn capabilities() -> Json<Vec<&'static str>> {
Json(CAPABILITIES.to_vec())
}
async fn capabilities_header_middleware<B>(request: Request<B>, next: Next<B>) -> Response {
let mut response = next.run(request).await;
response.headers_mut().insert(
"x-rustlog-capabilities",
CAPABILITIES.join(",").try_into().unwrap(),
);
response
}
async fn metrics() -> impl IntoApiResponse {
let metric_families = prometheus::gather();