add support for ndjson log queries

This commit is contained in:
boring_nick 2023-06-14 08:31:20 +03:00
parent 8d9115364b
commit f199a67447
4 changed files with 83 additions and 6 deletions

View File

@ -44,6 +44,7 @@ services:
- Significantly better storage efficiency (2x+ improvement) thanks to not duplicating log files and better compression (using ZSTD in Clickhouse)
- Blazing fast log queries with response streaming and a [highly performant IRC parser](https://github.com/jprochazk/twitch-rs)
- Support for ndjson logs responses
## Migrating from justlog
See [MIGRATION.md](./docs/MIGRATION.md)

View File

@ -1,8 +1,11 @@
mod join_iter;
mod json_stream;
mod ndjson_stream;
mod text_stream;
use self::{json_stream::JsonLogsStream, text_stream::TextLogsStream};
use self::{
json_stream::JsonLogsStream, ndjson_stream::NdJsonLogsStream, text_stream::TextLogsStream,
};
use crate::logs::{schema::Message, stream::LogsStream};
use aide::OperationOutput;
use axum::{
@ -13,10 +16,7 @@ use axum::{
};
use futures::TryStreamExt;
use indexmap::IndexMap;
use mime_guess::{
mime::{APPLICATION_JSON, TEXT_PLAIN_UTF_8},
Mime,
};
use mime_guess::mime::{APPLICATION_JSON, TEXT_PLAIN_UTF_8};
use reqwest::header::CONTENT_TYPE;
use schemars::JsonSchema;
@ -29,6 +29,7 @@ pub enum LogsResponseType {
Raw,
Text,
Json,
NdJson,
}
/// Used for schema only, actual serialization is manual
@ -56,11 +57,19 @@ impl IntoResponse for LogsResponse {
let stream = JsonLogsStream::new(self.stream);
(set_content_type(&APPLICATION_JSON), StreamBody::new(stream)).into_response()
}
LogsResponseType::NdJson => {
let stream = NdJsonLogsStream::new(self.stream);
(
set_content_type(&"application/x-ndjson"),
StreamBody::new(stream),
)
.into_response()
}
}
}
}
fn set_content_type(content_type: &'static Mime) -> impl IntoResponseParts {
fn set_content_type(content_type: &'static impl AsRef<str>) -> impl IntoResponseParts {
[(
CONTENT_TYPE,
HeaderValue::from_static(content_type.as_ref()),

View File

@ -0,0 +1,63 @@
use crate::{
logs::{parse_messages, parse_raw, stream::LogsStream},
Result,
};
use futures::{stream::TryChunks, Future, Stream, StreamExt, TryStreamExt};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::pin;
/// Rough estimation of how big a single message is in JSON format
const JSON_MESSAGE_SIZE: usize = 1024;
const CHUNK_SIZE: usize = 3000;
pub struct NdJsonLogsStream {
inner: TryChunks<LogsStream>,
}
impl NdJsonLogsStream {
pub fn new(stream: LogsStream) -> Self {
let inner = stream.try_chunks(CHUNK_SIZE);
Self { inner }
}
}
impl Stream for NdJsonLogsStream {
type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let fut = self.inner.next();
pin!(fut);
fut.poll(cx).map(|maybe_result| {
maybe_result.map(|result| match result {
Ok(chunk) => {
let irc_messages = parse_raw(chunk);
let messages: Vec<_> = parse_messages(&irc_messages).collect();
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * messages.len());
let serialized_messages: Vec<_> = messages
.into_par_iter()
.map(|message| {
let mut message_buf = Vec::with_capacity(JSON_MESSAGE_SIZE);
serde_json::to_writer(&mut message_buf, &message).unwrap();
message_buf
})
.collect();
for message_buf in serialized_messages {
buf.extend(message_buf);
buf.push(b'\n');
}
Ok(buf)
}
Err(err) => Err(err.1),
})
})
}
}

View File

@ -84,6 +84,8 @@ pub struct LogsParams {
pub raw: bool,
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub reverse: bool,
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub ndjson: bool,
}
impl LogsParams {
@ -92,6 +94,8 @@ impl LogsParams {
LogsResponseType::Raw
} else if self.json {
LogsResponseType::Json
} else if self.ndjson {
LogsResponseType::NdJson
} else {
LogsResponseType::Text
}