fix json stream and update web

This commit is contained in:
boring_nick 2023-06-19 18:32:18 +03:00
parent 42e04d57b5
commit aa48e5f8f2
4 changed files with 73 additions and 43 deletions

View File

@ -1,12 +1,15 @@
use crate::{
logs::{parse_messages, parse_raw, schema::message::ResponseMessage, stream::LogsStream},
logs::{
parse_messages, parse_raw,
schema::message::{BasicMessage, FullMessage, ResponseMessage},
stream::LogsStream,
},
Result,
};
use futures::{stream::TryChunks, Future, Stream, StreamExt, TryStreamExt};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::{
collections::VecDeque,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
@ -18,26 +21,67 @@ const FOOTER: &str = r#"]}"#;
const JSON_MESSAGE_SIZE: usize = 1024;
const CHUNK_SIZE: usize = 3000;
pub struct JsonLogsStream<T> {
pub enum JsonResponseType {
Basic,
Full,
}
pub struct JsonLogsStream {
inner: TryChunks<LogsStream>,
is_start: bool,
is_end: bool,
_message_type_marker: PhantomData<T>,
response_type: JsonResponseType,
}
impl<T> JsonLogsStream<T> {
pub fn new(stream: LogsStream) -> Self {
impl JsonLogsStream {
pub fn new(stream: LogsStream, response_type: JsonResponseType) -> Self {
let inner = stream.try_chunks(CHUNK_SIZE);
Self {
inner,
is_start: true,
is_end: false,
_message_type_marker: PhantomData::default(),
response_type,
}
}
fn serialize_chunk<'a, T: ResponseMessage<'a>>(
&mut self,
irc_messages: &'a [twitch::Message],
) -> Vec<u8> {
let mut messages: VecDeque<T> = parse_messages(irc_messages).collect();
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * irc_messages.len());
if self.is_start {
buf.extend_from_slice(HEADER.as_bytes());
self.is_start = false;
if let Some(mut message) = messages.pop_front() {
message.unescape_tags();
serde_json::to_writer(&mut buf, &message).unwrap();
}
}
let serialized_messages: Vec<_> = messages
.into_par_iter()
.map(|mut message| {
message.unescape_tags();
let mut message_buf = Vec::with_capacity(JSON_MESSAGE_SIZE);
serde_json::to_writer(&mut message_buf, &message).unwrap();
message_buf
})
.collect();
for message_buf in serialized_messages {
buf.push(b',');
buf.extend(message_buf);
}
buf
}
}
impl<T: for<'a> ResponseMessage<'a>> Stream for JsonLogsStream<T> {
impl Stream for JsonLogsStream {
type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -52,34 +96,14 @@ impl<T: for<'a> ResponseMessage<'a>> Stream for JsonLogsStream<T> {
Poll::Ready(Some(result)) => match result {
Ok(chunk) => {
let irc_messages = parse_raw(chunk);
let mut messages: VecDeque<T> = parse_messages(&irc_messages).collect();
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * irc_messages.len());
if self.is_start {
buf.extend_from_slice(HEADER.as_bytes());
self.is_start = false;
if let Some(mut message) = messages.pop_front() {
message.unescape_tags();
serde_json::to_writer(&mut buf, &message).unwrap();
let buf = match self.response_type {
JsonResponseType::Basic => {
self.serialize_chunk::<BasicMessage>(&irc_messages)
}
}
let serialized_messages: Vec<_> = messages
.into_par_iter()
.map(|mut message| {
message.unescape_tags();
let mut message_buf = Vec::with_capacity(JSON_MESSAGE_SIZE);
serde_json::to_writer(&mut message_buf, &message).unwrap();
message_buf
})
.collect();
for message_buf in serialized_messages {
buf.push(b',');
buf.extend(message_buf);
}
JsonResponseType::Full => {
self.serialize_chunk::<FullMessage>(&irc_messages)
}
};
Poll::Ready(Some(Ok(buf)))
}

View File

@ -3,6 +3,8 @@ mod json_stream;
mod ndjson_stream;
mod text_stream;
pub use json_stream::JsonResponseType;
use self::{
json_stream::JsonLogsStream, ndjson_stream::NdJsonLogsStream, text_stream::TextLogsStream,
};
@ -28,7 +30,7 @@ pub struct LogsResponse {
pub enum LogsResponseType {
Raw,
Text,
Json,
Json(JsonResponseType),
NdJson,
}
@ -53,10 +55,9 @@ impl IntoResponse for LogsResponse {
let stream = TextLogsStream::new(self.stream);
(set_content_type(&TEXT_PLAIN_UTF_8), StreamBody::new(stream)).into_response()
}
LogsResponseType::Json => {
let stream = JsonLogsStream::<FullMessage>::new(self.stream);
StreamBody::new(stream).into_response()
// (set_content_type(&APPLICATION_JSON), StreamBody::new(stream)).into_response()
LogsResponseType::Json(response_type) => {
let stream = JsonLogsStream::new(self.stream, response_type);
(set_content_type(&APPLICATION_JSON), StreamBody::new(stream)).into_response()
}
LogsResponseType::NdJson => {
let stream = NdJsonLogsStream::new(self.stream);

View File

@ -1,4 +1,4 @@
use super::responders::logs::LogsResponseType;
use super::responders::logs::{JsonResponseType, LogsResponseType};
use crate::logs::schema::{ChannelLogDate, UserLogDate};
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize};
@ -77,10 +77,13 @@ pub struct LogsPathChannel {
}
#[derive(Deserialize, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LogsParams {
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub json: bool,
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub json_basic: bool,
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub raw: bool,
#[serde(default, deserialize_with = "deserialize_bool_param")]
pub reverse: bool,
@ -92,8 +95,10 @@ impl LogsParams {
pub fn response_type(&self) -> LogsResponseType {
if self.raw {
LogsResponseType::Raw
} else if self.json_basic {
LogsResponseType::Json(JsonResponseType::Basic)
} else if self.json {
LogsResponseType::Json
LogsResponseType::Json(JsonResponseType::Full)
} else if self.ndjson {
LogsResponseType::NdJson
} else {

2
web

@ -1 +1 @@
Subproject commit 77511db98250fcc61155784bf8cf98a40528b682
Subproject commit b1b08e909f7d2138a1ee9ba7ca578a26c10b030c