Merge pull request #9 from boring-nick/basic-json
Add jsonBasic logs query type
This commit is contained in:
commit
63652282b2
|
@ -2,7 +2,7 @@ pub mod extract;
|
|||
pub mod schema;
|
||||
pub mod stream;
|
||||
|
||||
use self::schema::Message;
|
||||
use self::schema::message::ResponseMessage;
|
||||
use rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
|
||||
use tracing::warn;
|
||||
|
||||
|
@ -19,12 +19,12 @@ pub fn parse_raw(lines: Vec<String>) -> Vec<twitch::Message> {
|
|||
.collect()
|
||||
}
|
||||
|
||||
pub fn parse_messages(
|
||||
irc_messages: &[twitch::Message],
|
||||
) -> impl ParallelIterator<Item = Message<'_>> {
|
||||
pub fn parse_messages<'a, T: ResponseMessage<'a>>(
|
||||
irc_messages: &'a [twitch::Message],
|
||||
) -> impl ParallelIterator<Item = T> + 'a {
|
||||
irc_messages
|
||||
.par_iter()
|
||||
.filter_map(|irc_message| match Message::from_irc_message(irc_message) {
|
||||
.filter_map(|irc_message| match T::from_irc_message(irc_message) {
|
||||
Ok(message) => Some(message),
|
||||
Err(err) => {
|
||||
warn!("Could not parse message: {err}, irc: {:?}", irc_message);
|
||||
|
|
|
@ -1,282 +0,0 @@
|
|||
use anyhow::{anyhow, Context};
|
||||
use chrono::TimeZone;
|
||||
use chrono::{DateTime, Utc};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_repr::Serialize_repr;
|
||||
use std::borrow::Cow;
|
||||
use std::{collections::HashMap, fmt::Display};
|
||||
use strum::EnumString;
|
||||
use twitch::{Command, Tag};
|
||||
|
||||
const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
|
||||
pub struct ChannelLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
pub day: u32,
|
||||
}
|
||||
|
||||
impl Display for ChannelLogDate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}-{:0>2}-{:0>2}", self.year, self.month, self.day)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
pub struct UserLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum UserIdentifier<'a> {
|
||||
User(&'a str),
|
||||
UserId(&'a str),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ChannelIdentifier<'a> {
|
||||
Channel(&'a str),
|
||||
ChannelId(&'a str),
|
||||
}
|
||||
|
||||
#[derive(Serialize, JsonSchema, Debug, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Message<'a> {
|
||||
pub text: Cow<'a, str>,
|
||||
pub username: &'a str,
|
||||
pub display_name: &'a str,
|
||||
pub channel: &'a str,
|
||||
#[schemars(with = "String")]
|
||||
pub timestamp: DateTime<Utc>,
|
||||
pub id: &'a str,
|
||||
pub raw: &'a str,
|
||||
#[schemars(with = "i8")]
|
||||
pub r#type: MessageType,
|
||||
pub tags: HashMap<&'a str, Cow<'a, str>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize_repr, EnumString, Debug, PartialEq)]
|
||||
#[repr(i8)]
|
||||
#[strum(serialize_all = "UPPERCASE")]
|
||||
pub enum MessageType {
|
||||
// Whisper = 0,
|
||||
PrivMsg = 1,
|
||||
ClearChat = 2,
|
||||
// RoomState = 3,
|
||||
UserNotice = 4,
|
||||
// UserState = 5,
|
||||
// Notice = 6,
|
||||
ClearMsg = 13,
|
||||
}
|
||||
|
||||
impl<'a> Message<'a> {
|
||||
pub fn from_irc_message(irc_message: &'a twitch::Message) -> anyhow::Result<Self> {
|
||||
let channel = irc_message
|
||||
.channel()
|
||||
.context("Missing channel")?
|
||||
.trim_start_matches('#');
|
||||
|
||||
let raw_timestamp = irc_message
|
||||
.tag(Tag::TmiSentTs)
|
||||
.context("Missing timestamp tag")?
|
||||
.parse::<i64>()
|
||||
.context("Invalid timestamp")?;
|
||||
let timestamp = Utc
|
||||
.timestamp_millis_opt(raw_timestamp)
|
||||
.single()
|
||||
.context("Invalid timestamp")?;
|
||||
|
||||
let response_tags = irc_message
|
||||
.tags()
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.map(|(key, value)| (key.as_str(), Cow::Borrowed(*value)))
|
||||
.collect();
|
||||
|
||||
match irc_message.command() {
|
||||
Command::Privmsg => {
|
||||
let raw_text = irc_message.params().context("Privmsg has no params")?;
|
||||
let text = extract_message_text(raw_text);
|
||||
|
||||
let display_name = irc_message
|
||||
.tag(Tag::DisplayName)
|
||||
.context("Missing display name tag")?;
|
||||
let username = irc_message
|
||||
.prefix()
|
||||
.context("Message has no prefix")?
|
||||
.nick
|
||||
.context("Missing nickname")?;
|
||||
let id = irc_message.tag(Tag::Id).unwrap_or_default();
|
||||
|
||||
Ok(Self {
|
||||
text: Cow::Borrowed(text),
|
||||
username,
|
||||
display_name,
|
||||
channel,
|
||||
timestamp,
|
||||
id,
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::PrivMsg,
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
Command::Clearchat => {
|
||||
let mut username = None;
|
||||
|
||||
let text = match irc_message.params() {
|
||||
Some(user_login) => {
|
||||
let user_login = extract_message_text(user_login);
|
||||
username = Some(user_login);
|
||||
|
||||
match irc_message.tag(Tag::BanDuration) {
|
||||
Some(ban_duration) => {
|
||||
format!(
|
||||
"{user_login} has been timed out for {ban_duration} seconds"
|
||||
)
|
||||
}
|
||||
None => {
|
||||
format!("{user_login} has been banned")
|
||||
}
|
||||
}
|
||||
}
|
||||
None => "Chat has been cleared".to_owned(),
|
||||
};
|
||||
|
||||
Ok(Message {
|
||||
text: Cow::Owned(text),
|
||||
display_name: username.unwrap_or_default(),
|
||||
username: username.unwrap_or_default(),
|
||||
channel,
|
||||
timestamp,
|
||||
id: "",
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::ClearChat,
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
Command::UserNotice => {
|
||||
let system_message = irc_message
|
||||
.tag(Tag::SystemMsg)
|
||||
.context("System message tag missing")?;
|
||||
let system_message = twitch::unescape(system_message);
|
||||
|
||||
let text = if let Some(user_message) = irc_message.params() {
|
||||
let user_message = extract_message_text(user_message);
|
||||
Cow::Owned(format!("{system_message} {user_message}"))
|
||||
} else {
|
||||
Cow::Owned(system_message)
|
||||
};
|
||||
|
||||
let display_name = irc_message
|
||||
.tag(Tag::DisplayName)
|
||||
.context("Missing display name tag")?;
|
||||
let username = irc_message.tag(Tag::Login).context("Missing login tag")?;
|
||||
let id = irc_message.tag(Tag::Id).context("Missing message id tag")?;
|
||||
|
||||
let response_tags = response_tags
|
||||
.into_iter()
|
||||
.map(|(key, value)| (key, Cow::Owned(twitch::unescape(&value))))
|
||||
.collect();
|
||||
|
||||
Ok(Message {
|
||||
text,
|
||||
username,
|
||||
display_name,
|
||||
channel,
|
||||
timestamp,
|
||||
id,
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::UserNotice,
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
other => Err(anyhow!("Unsupported message type: {other:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unescape_tags(&mut self) {
|
||||
for value in self.tags.values_mut() {
|
||||
let new_value = twitch::unescape(value);
|
||||
*value = Cow::Owned(new_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Display for Message<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let timestamp = self.timestamp.format(TIMESTAMP_FORMAT);
|
||||
let channel = &self.channel;
|
||||
let username = &self.username;
|
||||
let text = &self.text;
|
||||
|
||||
if !username.is_empty() {
|
||||
write!(f, "[{timestamp}] #{channel} {username}: {text}")
|
||||
} else {
|
||||
write!(f, "[{timestamp}] #{channel} {text}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_message_text(message_text: &str) -> &str {
|
||||
let message_text = message_text.trim_start();
|
||||
let mut message_text = message_text.strip_prefix(':').unwrap_or(message_text);
|
||||
|
||||
let is_action =
|
||||
message_text.starts_with("\u{0001}ACTION ") && message_text.ends_with('\u{0001}');
|
||||
if is_action {
|
||||
// remove the prefix and suffix
|
||||
message_text = &message_text[8..message_text.len() - 1]
|
||||
}
|
||||
|
||||
message_text
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Message, MessageType};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::borrow::Cow;
|
||||
|
||||
#[test]
|
||||
fn parse_old_message() {
|
||||
let data = "@badges=;color=;display-name=Snusbot;emotes=;mod=0;room-id=22484632;subscriber=0;tmi-sent-ts=1489263601000;turbo=0;user-id=62541963;user-type= :snusbot!snusbot@snusbot.tmi.twitch.tv PRIVMSG #forsen :prasoc won 10 points in roulette and now has 2838 points! forsenPls";
|
||||
let irc_message = twitch::Message::parse(data).unwrap();
|
||||
let message = Message::from_irc_message(&irc_message).unwrap();
|
||||
let expected_message = Message {
|
||||
text: Cow::Borrowed(
|
||||
"prasoc won 10 points in roulette and now has 2838 points! forsenPls",
|
||||
),
|
||||
username: "snusbot",
|
||||
display_name: "Snusbot",
|
||||
channel: "forsen",
|
||||
timestamp: Utc.timestamp_millis_opt(1489263601000).unwrap(),
|
||||
id: "",
|
||||
raw: data,
|
||||
r#type: MessageType::PrivMsg,
|
||||
tags: [
|
||||
("mod", "0"),
|
||||
("color", ""),
|
||||
("badges", ""),
|
||||
("turbo", "0"),
|
||||
("subscriber", "0"),
|
||||
("user-id", "62541963"),
|
||||
("tmi-sent-ts", "1489263601000"),
|
||||
("room-id", "22484632"),
|
||||
("user-type", ""),
|
||||
("display-name", "Snusbot"),
|
||||
("emotes", ""),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Cow::Borrowed(v)))
|
||||
.collect(),
|
||||
};
|
||||
|
||||
assert_eq!(message, expected_message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
use anyhow::anyhow;
|
||||
use anyhow::Context;
|
||||
use chrono::{DateTime, TimeZone, Utc};
|
||||
use schemars::JsonSchema;
|
||||
use serde::Serialize;
|
||||
use std::{borrow::Cow, collections::HashMap};
|
||||
use twitch::{Command, Tag};
|
||||
|
||||
use super::ResponseMessage;
|
||||
|
||||
#[derive(Serialize, JsonSchema, Debug, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BasicMessage<'a> {
|
||||
pub text: Cow<'a, str>,
|
||||
pub display_name: &'a str,
|
||||
#[schemars(with = "String")]
|
||||
pub timestamp: DateTime<Utc>,
|
||||
pub id: &'a str,
|
||||
pub tags: HashMap<&'a str, Cow<'a, str>>,
|
||||
}
|
||||
|
||||
impl<'a> ResponseMessage<'a> for BasicMessage<'a> {
|
||||
fn from_irc_message(irc_message: &'a twitch::Message) -> anyhow::Result<Self> {
|
||||
let raw_timestamp = irc_message
|
||||
.tag(Tag::TmiSentTs)
|
||||
.context("Missing timestamp tag")?
|
||||
.parse::<i64>()
|
||||
.context("Invalid timestamp")?;
|
||||
let timestamp = Utc
|
||||
.timestamp_millis_opt(raw_timestamp)
|
||||
.single()
|
||||
.context("Invalid timestamp")?;
|
||||
|
||||
let response_tags = irc_message
|
||||
.tags()
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.map(|(key, value)| (key.as_str(), Cow::Borrowed(*value)))
|
||||
.collect();
|
||||
|
||||
match irc_message.command() {
|
||||
Command::Privmsg => {
|
||||
let raw_text = irc_message.params().context("Privmsg has no params")?;
|
||||
let text = extract_message_text(raw_text);
|
||||
|
||||
let display_name = irc_message
|
||||
.tag(Tag::DisplayName)
|
||||
.context("Missing display name tag")?;
|
||||
let id = irc_message.tag(Tag::Id).unwrap_or_default();
|
||||
|
||||
Ok(Self {
|
||||
text: Cow::Borrowed(text),
|
||||
display_name,
|
||||
timestamp,
|
||||
id,
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
Command::Clearchat => {
|
||||
let mut username = None;
|
||||
|
||||
let text = match irc_message.params() {
|
||||
Some(user_login) => {
|
||||
let user_login = extract_message_text(user_login);
|
||||
username = Some(user_login);
|
||||
|
||||
match irc_message.tag(Tag::BanDuration) {
|
||||
Some(ban_duration) => {
|
||||
format!(
|
||||
"{user_login} has been timed out for {ban_duration} seconds"
|
||||
)
|
||||
}
|
||||
None => {
|
||||
format!("{user_login} has been banned")
|
||||
}
|
||||
}
|
||||
}
|
||||
None => "Chat has been cleared".to_owned(),
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
text: Cow::Owned(text),
|
||||
display_name: username.unwrap_or_default(),
|
||||
timestamp,
|
||||
id: "",
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
Command::UserNotice => {
|
||||
let system_message = irc_message
|
||||
.tag(Tag::SystemMsg)
|
||||
.context("System message tag missing")?;
|
||||
let system_message = twitch::unescape(system_message);
|
||||
|
||||
let text = if let Some(user_message) = irc_message.params() {
|
||||
let user_message = extract_message_text(user_message);
|
||||
Cow::Owned(format!("{system_message} {user_message}"))
|
||||
} else {
|
||||
Cow::Owned(system_message)
|
||||
};
|
||||
|
||||
let display_name = irc_message
|
||||
.tag(Tag::DisplayName)
|
||||
.context("Missing display name tag")?;
|
||||
let id = irc_message.tag(Tag::Id).context("Missing message id tag")?;
|
||||
|
||||
let response_tags = response_tags
|
||||
.into_iter()
|
||||
.map(|(key, value)| (key, Cow::Owned(twitch::unescape(&value))))
|
||||
.collect();
|
||||
|
||||
Ok(Self {
|
||||
text,
|
||||
display_name,
|
||||
timestamp,
|
||||
id,
|
||||
tags: response_tags,
|
||||
})
|
||||
}
|
||||
other => Err(anyhow!("Unsupported message type: {other:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn unescape_tags(&mut self) {
|
||||
for value in self.tags.values_mut() {
|
||||
let new_value = twitch::unescape(value);
|
||||
*value = Cow::Owned(new_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_message_text(message_text: &str) -> &str {
|
||||
let message_text = message_text.trim_start();
|
||||
let mut message_text = message_text.strip_prefix(':').unwrap_or(message_text);
|
||||
|
||||
let is_action =
|
||||
message_text.starts_with("\u{0001}ACTION ") && message_text.ends_with('\u{0001}');
|
||||
if is_action {
|
||||
// remove the prefix and suffix
|
||||
message_text = &message_text[8..message_text.len() - 1]
|
||||
}
|
||||
|
||||
message_text
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
use super::{BasicMessage, ResponseMessage};
|
||||
use anyhow::{anyhow, Context};
|
||||
use schemars::JsonSchema;
|
||||
use serde::Serialize;
|
||||
use serde_repr::Serialize_repr;
|
||||
use std::fmt::Display;
|
||||
use strum::EnumString;
|
||||
use twitch::{Command, Tag};
|
||||
|
||||
const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
|
||||
|
||||
#[derive(Serialize, JsonSchema, Debug, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct FullMessage<'a> {
|
||||
#[serde(flatten)]
|
||||
pub basic: BasicMessage<'a>,
|
||||
pub username: &'a str,
|
||||
pub channel: &'a str,
|
||||
pub raw: &'a str,
|
||||
#[schemars(with = "i8")]
|
||||
pub r#type: MessageType,
|
||||
}
|
||||
|
||||
#[derive(Serialize_repr, EnumString, Debug, PartialEq)]
|
||||
#[repr(i8)]
|
||||
#[strum(serialize_all = "UPPERCASE")]
|
||||
pub enum MessageType {
|
||||
// Whisper = 0,
|
||||
PrivMsg = 1,
|
||||
ClearChat = 2,
|
||||
// RoomState = 3,
|
||||
UserNotice = 4,
|
||||
// UserState = 5,
|
||||
// Notice = 6,
|
||||
ClearMsg = 13,
|
||||
}
|
||||
|
||||
impl<'a> ResponseMessage<'a> for FullMessage<'a> {
|
||||
fn from_irc_message(irc_message: &'a twitch::Message) -> anyhow::Result<Self> {
|
||||
let channel = irc_message
|
||||
.channel()
|
||||
.context("Missing channel")?
|
||||
.trim_start_matches('#');
|
||||
|
||||
let basic = BasicMessage::from_irc_message(irc_message)?;
|
||||
|
||||
match irc_message.command() {
|
||||
Command::Privmsg => {
|
||||
let username = irc_message
|
||||
.prefix()
|
||||
.context("Message has no prefix")?
|
||||
.nick
|
||||
.context("Missing nickname")?;
|
||||
|
||||
Ok(Self {
|
||||
basic,
|
||||
username,
|
||||
channel,
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::PrivMsg,
|
||||
})
|
||||
}
|
||||
Command::Clearchat => {
|
||||
let username = irc_message.params().unwrap_or_default();
|
||||
|
||||
Ok(Self {
|
||||
basic,
|
||||
username,
|
||||
channel,
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::ClearChat,
|
||||
})
|
||||
}
|
||||
Command::UserNotice => {
|
||||
let username = irc_message.tag(Tag::Login).context("Missing login tag")?;
|
||||
|
||||
Ok(Self {
|
||||
basic,
|
||||
username,
|
||||
channel,
|
||||
raw: irc_message.raw(),
|
||||
r#type: MessageType::UserNotice,
|
||||
})
|
||||
}
|
||||
other => Err(anyhow!("Unsupported message type: {other:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn unescape_tags(&mut self) {
|
||||
self.basic.unescape_tags();
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Display for FullMessage<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let timestamp = self.basic.timestamp.format(TIMESTAMP_FORMAT);
|
||||
let channel = &self.channel;
|
||||
let username = &self.username;
|
||||
let text = &self.basic.text;
|
||||
|
||||
if !username.is_empty() {
|
||||
write!(f, "[{timestamp}] #{channel} {username}: {text}")
|
||||
} else {
|
||||
write!(f, "[{timestamp}] #{channel} {text}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{FullMessage, MessageType};
|
||||
use crate::logs::schema::message::{BasicMessage, ResponseMessage};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::borrow::Cow;
|
||||
|
||||
#[test]
|
||||
fn parse_old_message() {
|
||||
let data = "@badges=;color=;display-name=Snusbot;emotes=;mod=0;room-id=22484632;subscriber=0;tmi-sent-ts=1489263601000;turbo=0;user-id=62541963;user-type= :snusbot!snusbot@snusbot.tmi.twitch.tv PRIVMSG #forsen :prasoc won 10 points in roulette and now has 2838 points! forsenPls";
|
||||
let irc_message = twitch::Message::parse(data).unwrap();
|
||||
let message = FullMessage::from_irc_message(&irc_message).unwrap();
|
||||
let expected_message = FullMessage {
|
||||
basic: BasicMessage {
|
||||
text: Cow::Borrowed(
|
||||
"prasoc won 10 points in roulette and now has 2838 points! forsenPls",
|
||||
),
|
||||
display_name: "Snusbot",
|
||||
timestamp: Utc.timestamp_millis_opt(1489263601000).unwrap(),
|
||||
id: "",
|
||||
tags: [
|
||||
("mod", "0"),
|
||||
("color", ""),
|
||||
("badges", ""),
|
||||
("turbo", "0"),
|
||||
("subscriber", "0"),
|
||||
("user-id", "62541963"),
|
||||
("tmi-sent-ts", "1489263601000"),
|
||||
("room-id", "22484632"),
|
||||
("user-type", ""),
|
||||
("display-name", "Snusbot"),
|
||||
("emotes", ""),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Cow::Borrowed(v)))
|
||||
.collect(),
|
||||
},
|
||||
raw: data,
|
||||
r#type: MessageType::PrivMsg,
|
||||
username: "snusbot",
|
||||
channel: "forsen",
|
||||
};
|
||||
|
||||
assert_eq!(message, expected_message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
mod basic;
|
||||
mod full;
|
||||
|
||||
pub use basic::BasicMessage;
|
||||
pub use full::{FullMessage, MessageType};
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
pub trait ResponseMessage<'a>: Sized + Send + Serialize + Unpin {
|
||||
fn from_irc_message(msg: &'a twitch::Message) -> anyhow::Result<Self>;
|
||||
|
||||
fn unescape_tags(&mut self);
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
pub mod message;
|
||||
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
|
||||
pub struct ChannelLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
pub day: u32,
|
||||
}
|
||||
|
||||
impl Display for ChannelLogDate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}-{:0>2}-{:0>2}", self.year, self.month, self.day)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
pub struct UserLogDate {
|
||||
pub year: u32,
|
||||
pub month: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum UserIdentifier<'a> {
|
||||
User(&'a str),
|
||||
UserId(&'a str),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ChannelIdentifier<'a> {
|
||||
Channel(&'a str),
|
||||
ChannelId(&'a str),
|
||||
}
|
|
@ -1,5 +1,9 @@
|
|||
use crate::{
|
||||
logs::{parse_messages, parse_raw, stream::LogsStream},
|
||||
logs::{
|
||||
parse_messages, parse_raw,
|
||||
schema::message::{BasicMessage, FullMessage, ResponseMessage},
|
||||
stream::LogsStream,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
use futures::{stream::TryChunks, Future, Stream, StreamExt, TryStreamExt};
|
||||
|
@ -17,21 +21,64 @@ const FOOTER: &str = r#"]}"#;
|
|||
const JSON_MESSAGE_SIZE: usize = 1024;
|
||||
const CHUNK_SIZE: usize = 3000;
|
||||
|
||||
pub enum JsonResponseType {
|
||||
Basic,
|
||||
Full,
|
||||
}
|
||||
|
||||
pub struct JsonLogsStream {
|
||||
inner: TryChunks<LogsStream>,
|
||||
is_start: bool,
|
||||
is_end: bool,
|
||||
response_type: JsonResponseType,
|
||||
}
|
||||
|
||||
impl JsonLogsStream {
|
||||
pub fn new(stream: LogsStream) -> Self {
|
||||
pub fn new(stream: LogsStream, response_type: JsonResponseType) -> Self {
|
||||
let inner = stream.try_chunks(CHUNK_SIZE);
|
||||
Self {
|
||||
inner,
|
||||
is_start: true,
|
||||
is_end: false,
|
||||
response_type,
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_chunk<'a, T: ResponseMessage<'a>>(
|
||||
&mut self,
|
||||
irc_messages: &'a [twitch::Message],
|
||||
) -> Vec<u8> {
|
||||
let mut messages: VecDeque<T> = parse_messages(irc_messages).collect();
|
||||
|
||||
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * irc_messages.len());
|
||||
|
||||
if self.is_start {
|
||||
buf.extend_from_slice(HEADER.as_bytes());
|
||||
self.is_start = false;
|
||||
|
||||
if let Some(mut message) = messages.pop_front() {
|
||||
message.unescape_tags();
|
||||
serde_json::to_writer(&mut buf, &message).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let serialized_messages: Vec<_> = messages
|
||||
.into_par_iter()
|
||||
.map(|mut message| {
|
||||
message.unescape_tags();
|
||||
let mut message_buf = Vec::with_capacity(JSON_MESSAGE_SIZE);
|
||||
serde_json::to_writer(&mut message_buf, &message).unwrap();
|
||||
message_buf
|
||||
})
|
||||
.collect();
|
||||
|
||||
for message_buf in serialized_messages {
|
||||
buf.push(b',');
|
||||
buf.extend(message_buf);
|
||||
}
|
||||
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for JsonLogsStream {
|
||||
|
@ -49,34 +96,14 @@ impl Stream for JsonLogsStream {
|
|||
Poll::Ready(Some(result)) => match result {
|
||||
Ok(chunk) => {
|
||||
let irc_messages = parse_raw(chunk);
|
||||
let mut messages: VecDeque<_> = parse_messages(&irc_messages).collect();
|
||||
|
||||
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * irc_messages.len());
|
||||
|
||||
if self.is_start {
|
||||
buf.extend_from_slice(HEADER.as_bytes());
|
||||
self.is_start = false;
|
||||
|
||||
if let Some(mut message) = messages.pop_front() {
|
||||
message.unescape_tags();
|
||||
serde_json::to_writer(&mut buf, &message).unwrap();
|
||||
let buf = match self.response_type {
|
||||
JsonResponseType::Basic => {
|
||||
self.serialize_chunk::<BasicMessage>(&irc_messages)
|
||||
}
|
||||
}
|
||||
|
||||
let serialized_messages: Vec<_> = messages
|
||||
.into_par_iter()
|
||||
.map(|mut message| {
|
||||
message.unescape_tags();
|
||||
let mut message_buf = Vec::with_capacity(JSON_MESSAGE_SIZE);
|
||||
serde_json::to_writer(&mut message_buf, &message).unwrap();
|
||||
message_buf
|
||||
})
|
||||
.collect();
|
||||
|
||||
for message_buf in serialized_messages {
|
||||
buf.push(b',');
|
||||
buf.extend(message_buf);
|
||||
}
|
||||
JsonResponseType::Full => {
|
||||
self.serialize_chunk::<FullMessage>(&irc_messages)
|
||||
}
|
||||
};
|
||||
|
||||
Poll::Ready(Some(Ok(buf)))
|
||||
}
|
||||
|
|
|
@ -3,10 +3,12 @@ mod json_stream;
|
|||
mod ndjson_stream;
|
||||
mod text_stream;
|
||||
|
||||
pub use json_stream::JsonResponseType;
|
||||
|
||||
use self::{
|
||||
json_stream::JsonLogsStream, ndjson_stream::NdJsonLogsStream, text_stream::TextLogsStream,
|
||||
};
|
||||
use crate::logs::{schema::Message, stream::LogsStream};
|
||||
use crate::logs::{schema::message::FullMessage, stream::LogsStream};
|
||||
use aide::OperationOutput;
|
||||
use axum::{
|
||||
body::StreamBody,
|
||||
|
@ -28,14 +30,14 @@ pub struct LogsResponse {
|
|||
pub enum LogsResponseType {
|
||||
Raw,
|
||||
Text,
|
||||
Json,
|
||||
Json(JsonResponseType),
|
||||
NdJson,
|
||||
}
|
||||
|
||||
/// Used for schema only, actual serialization is manual
|
||||
#[derive(JsonSchema)]
|
||||
pub struct JsonLogsResponse<'a> {
|
||||
pub messages: Vec<Message<'a>>,
|
||||
pub messages: Vec<FullMessage<'a>>,
|
||||
}
|
||||
|
||||
impl IntoResponse for LogsResponse {
|
||||
|
@ -53,8 +55,8 @@ impl IntoResponse for LogsResponse {
|
|||
let stream = TextLogsStream::new(self.stream);
|
||||
(set_content_type(&TEXT_PLAIN_UTF_8), StreamBody::new(stream)).into_response()
|
||||
}
|
||||
LogsResponseType::Json => {
|
||||
let stream = JsonLogsStream::new(self.stream);
|
||||
LogsResponseType::Json(response_type) => {
|
||||
let stream = JsonLogsStream::new(self.stream, response_type);
|
||||
(set_content_type(&APPLICATION_JSON), StreamBody::new(stream)).into_response()
|
||||
}
|
||||
LogsResponseType::NdJson => {
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
use crate::{
|
||||
logs::{parse_messages, parse_raw, stream::LogsStream},
|
||||
logs::{
|
||||
parse_messages, parse_raw,
|
||||
schema::message::{BasicMessage, ResponseMessage},
|
||||
stream::LogsStream,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
use futures::{stream::TryChunks, Future, Stream, StreamExt, TryStreamExt};
|
||||
|
@ -36,7 +40,7 @@ impl Stream for NdJsonLogsStream {
|
|||
maybe_result.map(|result| match result {
|
||||
Ok(chunk) => {
|
||||
let irc_messages = parse_raw(chunk);
|
||||
let messages: Vec<_> = parse_messages(&irc_messages).collect();
|
||||
let messages: Vec<BasicMessage> = parse_messages(&irc_messages).collect();
|
||||
|
||||
let mut buf = Vec::with_capacity(JSON_MESSAGE_SIZE * messages.len());
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use super::join_iter::JoinIter;
|
||||
use crate::{
|
||||
logs::{parse_messages, parse_raw, stream::LogsStream},
|
||||
logs::{parse_messages, parse_raw, schema::message::FullMessage, stream::LogsStream},
|
||||
Result,
|
||||
};
|
||||
use futures::{stream::TryChunks, Future, Stream, StreamExt, TryStreamExt};
|
||||
|
@ -35,7 +35,7 @@ impl Stream for TextLogsStream {
|
|||
item.map(|result| match result {
|
||||
Ok(chunk) => {
|
||||
let irc_messages = parse_raw(chunk);
|
||||
let messages: Vec<_> = parse_messages(&irc_messages).collect();
|
||||
let messages: Vec<FullMessage> = parse_messages(&irc_messages).collect();
|
||||
|
||||
let mut text = messages.iter().join('\n').to_string();
|
||||
text.push('\n');
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use super::responders::logs::LogsResponseType;
|
||||
use super::responders::logs::{JsonResponseType, LogsResponseType};
|
||||
use crate::logs::schema::{ChannelLogDate, UserLogDate};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
|
@ -77,10 +77,13 @@ pub struct LogsPathChannel {
|
|||
}
|
||||
|
||||
#[derive(Deserialize, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LogsParams {
|
||||
#[serde(default, deserialize_with = "deserialize_bool_param")]
|
||||
pub json: bool,
|
||||
#[serde(default, deserialize_with = "deserialize_bool_param")]
|
||||
pub json_basic: bool,
|
||||
#[serde(default, deserialize_with = "deserialize_bool_param")]
|
||||
pub raw: bool,
|
||||
#[serde(default, deserialize_with = "deserialize_bool_param")]
|
||||
pub reverse: bool,
|
||||
|
@ -92,8 +95,10 @@ impl LogsParams {
|
|||
pub fn response_type(&self) -> LogsResponseType {
|
||||
if self.raw {
|
||||
LogsResponseType::Raw
|
||||
} else if self.json_basic {
|
||||
LogsResponseType::Json(JsonResponseType::Basic)
|
||||
} else if self.json {
|
||||
LogsResponseType::Json
|
||||
LogsResponseType::Json(JsonResponseType::Full)
|
||||
} else if self.ndjson {
|
||||
LogsResponseType::NdJson
|
||||
} else {
|
||||
|
|
2
web
2
web
|
@ -1 +1 @@
|
|||
Subproject commit 77511db98250fcc61155784bf8cf98a40528b682
|
||||
Subproject commit b1b08e909f7d2138a1ee9ba7ca578a26c10b030c
|
Loading…
Reference in New Issue