split large channel log requests into multiple queries

This commit is contained in:
boring_nick 2023-09-03 16:56:54 +03:00
parent 0b0e45fe96
commit c68e4c1112
2 changed files with 119 additions and 13 deletions

View File

@ -10,10 +10,12 @@ use crate::{
web::schema::AvailableLogDate,
Result,
};
use chrono::{Datelike, NaiveDateTime};
use clickhouse::Client;
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Utc};
use clickhouse::{query::RowCursor, Client};
use rand::{seq::IteratorRandom, thread_rng};
use tracing::info;
use tracing::{debug, info};
const CHANNEL_MULTI_QUERY_SIZE_DAYS: i64 = 14;
pub async fn read_channel(
db: &Client,
@ -25,20 +27,92 @@ pub async fn read_channel(
} else {
"ASC"
};
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let mut query = format!("SELECT raw FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp {suffix}");
let interval = Duration::days(CHANNEL_MULTI_QUERY_SIZE_DAYS);
if params.to - params.from > interval {
let count = db
.query("SELECT count() FROM (SELECT timestamp FROM message WHERE channel_id = ? AND timestamp >= ? AND timestamp < ? LIMIT 1)")
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch_one::<i32>().await?;
if count == 0 {
return Err(Error::NotFound);
}
let mut streams = Vec::with_capacity(1);
if params.logs_params.reverse {
let mut current_to = params.to;
let mut current_from = current_to - interval;
loop {
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
streams.push(cursor);
current_from -= interval;
current_to -= interval;
if current_from > params.from {
let cursor = next_cursor(db, &query, channel_id, params.from, current_to)?;
streams.push(cursor);
break;
}
}
} else {
let mut current_from = params.from;
let mut current_to = current_from + interval;
loop {
let cursor = next_cursor(db, &query, channel_id, current_from, current_to)?;
streams.push(cursor);
current_from += interval;
current_to += interval;
if current_to > params.to {
let cursor = next_cursor(db, &query, channel_id, current_from, params.to)?;
streams.push(cursor);
break;
}
}
}
debug!("Using {} queries for multi-query stream", streams.len());
LogsStream::new_multi_query(streams)
} else {
apply_limit_offset(
&mut query,
params.logs_params.limit,
params.logs_params.offset,
);
let cursor = db
.query(&query)
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
}
}
fn next_cursor(
db: &Client,
query: &str,
channel_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<RowCursor<String>> {
let cursor = db
.query(&query)
.query(query)
.bind(channel_id)
.bind(params.from.timestamp_millis() as f64 / 1000.0)
.bind(params.to.timestamp_millis() as f64 / 1000.0)
.bind(from.timestamp_millis() as f64 / 1000.0)
.bind(to.timestamp_millis() as f64 / 1000.0)
.fetch()?;
LogsStream::new_cursor(cursor).await
Ok(cursor)
}
pub async fn read_user(

View File

@ -17,6 +17,10 @@ pub enum LogsStream {
cursor: RowCursor<String>,
first_item: Option<String>,
},
MultiQuery {
cursors: Vec<RowCursor<String>>,
current: usize,
},
Provided(Iter<IntoIter<String>>),
}
@ -37,6 +41,17 @@ impl LogsStream {
Ok(Self::Provided(stream::iter(iter)))
}
}
pub fn new_multi_query(cursors: Vec<RowCursor<String>>) -> Result<Self> {
// if streams.is_empty() {
// return Err(Error::NotFound);
// }
Ok(Self::MultiQuery {
cursors,
current: 0,
})
}
}
impl Stream for LogsStream {
@ -55,6 +70,23 @@ impl Stream for LogsStream {
}
}
LogsStream::Provided(iter) => Pin::new(iter).poll_next(cx).map(|item| item.map(Ok)),
LogsStream::MultiQuery { cursors, current } => match cursors.get_mut(*current) {
Some(cursor) => {
let next_line_poll = {
let fut = cursor.next();
pin!(fut);
fut.poll(cx)
};
if let Poll::Ready(Ok(None)) = next_line_poll {
*current += 1;
self.poll_next(cx)
} else {
next_line_poll.map(|result| result.map_err(|err| err.into()).transpose())
}
}
None => Poll::Ready(None),
},
}
}
}