Compare commits
3 Commits
0b0e45fe96
...
b7a6bfbf7b
Author | SHA1 | Date |
---|---|---|
boring_nick | b7a6bfbf7b | |
boring_nick | 26084111f7 | |
boring_nick | c68e4c1112 |
|
@ -10,10 +10,12 @@ use crate::{
|
|||
web::schema::AvailableLogDate,
|
||||
Result,
|
||||
};
|
||||
use chrono::{Datelike, NaiveDateTime};
|
||||
use clickhouse::Client;
|
||||
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Utc};
|
||||
use clickhouse::{query::RowCursor, Client};
|
||||
use rand::{seq::IteratorRandom, thread_rng};
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;
|
||||
|
||||
pub async fn read_channel(
|
||||
db: &Client,
|
||||
|
@ -25,20 +27,78 @@ pub async fn read_channel(
|
|||
} else {
|
||||
"ASC"
|
||||
};
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
|
||||
apply_limit_offset(
|
||||
&mut query,
|
||||
params.logs_params.limit,
|
||||
params.logs_params.offset,
|
||||
);
|
||||
|
||||
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
|
||||
|
||||
let interval = Duration::days(CHANNEL_MULTI_QUERY_SIZE_DAYS);
|
||||
if params.to - params.from > interval {
|
||||
let count = db
|
||||
.query("SELECT count() FROM (SELECT timestamp FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? LIMIT 1)")
|
||||
.bind(channel_id)
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch_one::<i32>().await?;
|
||||
if count == 0 {
|
||||
return Err(Error::NotFound);
|
||||
}
|
||||
|
||||
let mut streams = Vec::with_capacity(1);
|
||||
|
||||
let mut current_from = params.from;
|
||||
let mut current_to = current_from + interval;
|
||||
|
||||
loop {
|
||||
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
|
||||
streams.push(cursor);
|
||||
|
||||
current_from += interval;
|
||||
current_to += interval;
|
||||
|
||||
if current_to > params.to {
|
||||
let cursor = next_cursor(db, &query, channel_id, current_from, params.to)?;
|
||||
streams.push(cursor);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if params.logs_params.reverse {
|
||||
streams.reverse();
|
||||
}
|
||||
|
||||
debug!("Using {} queries for multi-query stream", streams.len());
|
||||
|
||||
LogsStream::new_multi_query(streams)
|
||||
} else {
|
||||
apply_limit_offset(
|
||||
&mut query,
|
||||
params.logs_params.limit,
|
||||
params.logs_params.offset,
|
||||
);
|
||||
|
||||
let cursor = db
|
||||
.query(&query)
|
||||
.bind(channel_id)
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch()?;
|
||||
LogsStream::new_cursor(cursor).await
|
||||
}
|
||||
}
|
||||
|
||||
fn next_cursor(
|
||||
db: &Client,
|
||||
query: &str,
|
||||
channel_id: &str,
|
||||
from: DateTime<Utc>,
|
||||
to: DateTime<Utc>,
|
||||
) -> Result<RowCursor<String>> {
|
||||
let cursor = db
|
||||
.query(&query)
|
||||
.query(query)
|
||||
.bind(channel_id)
|
||||
.bind(params.from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(params.to.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(from.timestamp_millis() as f64 / 1000.0)
|
||||
.bind(to.timestamp_millis() as f64 / 1000.0)
|
||||
.fetch()?;
|
||||
LogsStream::new_cursor(cursor).await
|
||||
Ok(cursor)
|
||||
}
|
||||
|
||||
pub async fn read_user(
|
||||
|
|
|
@ -17,6 +17,10 @@ pub enum LogsStream {
|
|||
cursor: RowCursor<String>,
|
||||
first_item: Option<String>,
|
||||
},
|
||||
MultiQuery {
|
||||
cursors: Vec<RowCursor<String>>,
|
||||
current: usize,
|
||||
},
|
||||
Provided(Iter<IntoIter<String>>),
|
||||
}
|
||||
|
||||
|
@ -37,6 +41,17 @@ impl LogsStream {
|
|||
Ok(Self::Provided(stream::iter(iter)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_multi_query(cursors: Vec<RowCursor<String>>) -> Result<Self> {
|
||||
// if streams.is_empty() {
|
||||
// return Err(Error::NotFound);
|
||||
// }
|
||||
|
||||
Ok(Self::MultiQuery {
|
||||
cursors,
|
||||
current: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LogsStream {
|
||||
|
@ -55,6 +70,23 @@ impl Stream for LogsStream {
|
|||
}
|
||||
}
|
||||
LogsStream::Provided(iter) => Pin::new(iter).poll_next(cx).map(|item| item.map(Ok)),
|
||||
LogsStream::MultiQuery { cursors, current } => match cursors.get_mut(*current) {
|
||||
Some(cursor) => {
|
||||
let next_line_poll = {
|
||||
let fut = cursor.next();
|
||||
pin!(fut);
|
||||
fut.poll(cx)
|
||||
};
|
||||
|
||||
if let Poll::Ready(Ok(None)) = next_line_poll {
|
||||
*current += 1;
|
||||
self.poll_next(cx)
|
||||
} else {
|
||||
next_line_poll.map(|result| result.map_err(|err| err.into()).transpose())
|
||||
}
|
||||
}
|
||||
None => Poll::Ready(None),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,12 @@ use aide::{
|
|||
openapi::OpenApi,
|
||||
redoc::Redoc,
|
||||
};
|
||||
use axum::{middleware, response::IntoResponse, Extension, Json, ServiceExt};
|
||||
use axum::{
|
||||
http::Request,
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
Extension, Json, ServiceExt,
|
||||
};
|
||||
use axum_prometheus::PrometheusMetricLayerBuilder;
|
||||
use prometheus::TextEncoder;
|
||||
use std::{
|
||||
|
@ -30,6 +35,8 @@ use tower_http::{
|
|||
};
|
||||
use tracing::{debug, info};
|
||||
|
||||
const CAPABILITIES: &[&str] = &["arbitrary-range-query"];
|
||||
|
||||
pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessage>) {
|
||||
aide::gen::on_error(|error| {
|
||||
panic!("Could not generate docs: {error}");
|
||||
|
@ -131,10 +138,12 @@ pub async fn run(app: App, mut shutdown_rx: ShutdownRx, bot_tx: Sender<BotMessag
|
|||
}),
|
||||
)
|
||||
.api_route("/optout", post(handlers::optout))
|
||||
.api_route("/capabilities", get(capabilities))
|
||||
.route("/docs", Redoc::new("/openapi.json").axum_route())
|
||||
.route("/openapi.json", get(serve_openapi))
|
||||
.route("/assets/*asset", get(frontend::static_asset))
|
||||
.fallback(frontend::static_asset)
|
||||
.layer(middleware::from_fn(capabilities_header_middleware))
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(trace_layer::make_span_with)
|
||||
|
@ -173,6 +182,19 @@ pub fn parse_listen_addr(addr: &str) -> Result<SocketAddr, AddrParseError> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn capabilities() -> Json<Vec<&'static str>> {
|
||||
Json(CAPABILITIES.to_vec())
|
||||
}
|
||||
|
||||
async fn capabilities_header_middleware<B>(request: Request<B>, next: Next<B>) -> Response {
|
||||
let mut response = next.run(request).await;
|
||||
response.headers_mut().insert(
|
||||
"x-rustlog-capabilities",
|
||||
CAPABILITIES.join(",").try_into().unwrap(),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
async fn metrics() -> impl IntoApiResponse {
|
||||
let metric_families = prometheus::gather();
|
||||
|
||||
|
|
Loading…
Reference in New Issue