optimize migrator
This commit is contained in:
parent
4f3dad9fd6
commit
d8eba3580e
|
@ -6,6 +6,7 @@ RUN yarn build
|
|||
|
||||
FROM rust:1.70-bullseye AS chef
|
||||
USER root
|
||||
ENV CARGO_PROFILE_RELEASE_LTO=true
|
||||
RUN cargo install cargo-chef
|
||||
WORKDIR /app
|
||||
|
||||
|
|
|
@ -20,6 +20,10 @@ impl MessageWithTags for twitch::Message {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn extract_user_id<T: MessageWithTags>(msg: &T) -> Option<&str> {
|
||||
msg.get_tag(&Tag::UserId)
|
||||
}
|
||||
|
||||
pub fn extract_channel_and_user_from_raw<T: MessageWithTags>(
|
||||
msg: &T,
|
||||
) -> Option<(&str, Option<&str>)> {
|
||||
|
|
|
@ -3,7 +3,7 @@ mod reader;
|
|||
use self::reader::{LogsReader, COMPRESSED_CHANNEL_FILE, UNCOMPRESSED_CHANNEL_FILE};
|
||||
use crate::{
|
||||
db::schema::{Message, MESSAGES_TABLE},
|
||||
logs::extract::{extract_channel_and_user_from_raw, extract_raw_timestamp},
|
||||
logs::extract::{extract_raw_timestamp, extract_user_id},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use chrono::{DateTime, Datelike, TimeZone, Utc};
|
||||
|
@ -21,7 +21,7 @@ use std::{
|
|||
use tokio::sync::Semaphore;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const LINES_BATCH_SIZE: usize = 100_000;
|
||||
const INSERT_BATCH_SIZE: u64 = 10_000_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Migrator {
|
||||
|
@ -87,7 +87,7 @@ impl Migrator {
|
|||
Some(Duration::from_secs(5)),
|
||||
Some(Duration::from_secs(20)),
|
||||
)
|
||||
.with_max_entries(750_000)
|
||||
.with_max_entries(INSERT_BATCH_SIZE)
|
||||
.with_period(Some(Duration::from_secs(15)));
|
||||
|
||||
for day in days {
|
||||
|
@ -103,7 +103,13 @@ impl Migrator {
|
|||
.await?;
|
||||
}
|
||||
|
||||
inserter.end().await?;
|
||||
let stats = inserter.end().await?;
|
||||
if stats.entries > 0 {
|
||||
info!(
|
||||
"DB: {} entries ({} transactions) have been inserted",
|
||||
stats.entries, stats.transactions,
|
||||
);
|
||||
}
|
||||
|
||||
drop(permit);
|
||||
Result::<_, anyhow::Error>::Ok(())
|
||||
|
@ -141,20 +147,13 @@ impl Migrator {
|
|||
let file_reader = BufReader::new(File::open(&compressed_file_path)?);
|
||||
let gz = BufReader::new(GzDecoder::new(file_reader));
|
||||
|
||||
self.migrate_reader(gz, date, channel_id, inserter, &compressed_file_path)
|
||||
.await
|
||||
self.migrate_reader(gz, date, channel_id, inserter).await
|
||||
} else if uncompressed_file_path.exists() {
|
||||
debug!("Reading uncompressed log {uncompressed_file_path:?}");
|
||||
let file_reader = BufReader::new(File::open(&uncompressed_file_path)?);
|
||||
|
||||
self.migrate_reader(
|
||||
file_reader,
|
||||
date,
|
||||
channel_id,
|
||||
inserter,
|
||||
&uncompressed_file_path,
|
||||
)
|
||||
.await
|
||||
self.migrate_reader(file_reader, date, channel_id, inserter)
|
||||
.await
|
||||
} else {
|
||||
Err(anyhow!("File does not exist"))
|
||||
}
|
||||
|
@ -166,64 +165,50 @@ impl Migrator {
|
|||
datetime: DateTime<Utc>,
|
||||
channel_id: &'a str,
|
||||
inserter: &mut Inserter<Message<'a>>,
|
||||
file_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut buffer = Vec::with_capacity(LINES_BATCH_SIZE);
|
||||
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
buffer.push(line);
|
||||
|
||||
if buffer.len() >= LINES_BATCH_SIZE {
|
||||
write_lines_buffer(channel_id, buffer, inserter, datetime, file_path).await?;
|
||||
buffer = Vec::with_capacity(LINES_BATCH_SIZE);
|
||||
}
|
||||
write_line(channel_id, line, inserter, datetime).await?;
|
||||
}
|
||||
|
||||
write_lines_buffer(channel_id, buffer, inserter, datetime, file_path).await?;
|
||||
let stats = inserter.commit().await?;
|
||||
if stats.entries > 0 {
|
||||
info!(
|
||||
"DB: {} entries ({} transactions) have been inserted",
|
||||
stats.entries, stats.transactions,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_lines_buffer<'a>(
|
||||
async fn write_line<'a>(
|
||||
channel_id: &'a str,
|
||||
buffer: Vec<String>,
|
||||
inserter: &mut Inserter<Message<'a>>,
|
||||
raw: String,
|
||||
inserter: &mut Inserter<Message<'_>>,
|
||||
datetime: DateTime<Utc>,
|
||||
file_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
for (i, raw) in buffer.into_iter().enumerate() {
|
||||
match twitch::Message::parse(raw) {
|
||||
Some(irc_message) => {
|
||||
let timestamp = extract_raw_timestamp(&irc_message)
|
||||
.unwrap_or_else(|| datetime.timestamp_millis() as u64);
|
||||
match twitch::Message::parse(raw) {
|
||||
Some(irc_message) => {
|
||||
let timestamp = extract_raw_timestamp(&irc_message)
|
||||
.unwrap_or_else(|| datetime.timestamp_millis() as u64);
|
||||
let user_id = extract_user_id(&irc_message).unwrap_or_default();
|
||||
|
||||
let user_id = extract_channel_and_user_from_raw(&irc_message)
|
||||
.and_then(|(_, user_id)| user_id)
|
||||
.map(str::to_owned)
|
||||
.unwrap_or_default();
|
||||
|
||||
let message = Message {
|
||||
channel_id: Cow::Borrowed(channel_id),
|
||||
user_id: Cow::Owned(user_id),
|
||||
timestamp,
|
||||
raw: Cow::Owned(irc_message.into_raw()),
|
||||
};
|
||||
inserter.write(&message).await?;
|
||||
}
|
||||
None => {
|
||||
warn!("Could not parse message (line {i} of file {file_path:?})");
|
||||
}
|
||||
let message = Message {
|
||||
channel_id: Cow::Borrowed(channel_id),
|
||||
user_id: Cow::Borrowed(user_id),
|
||||
timestamp,
|
||||
raw: Cow::Borrowed(irc_message.raw()),
|
||||
};
|
||||
// This is safe because despite the function signature,
|
||||
// `inserter.write` only uses the value for serialization at the time of the method call, and not later
|
||||
let message: Message<'static> = unsafe { std::mem::transmute(message) };
|
||||
inserter.write(&message).await?;
|
||||
}
|
||||
None => {
|
||||
warn!("Could not parse message");
|
||||
}
|
||||
}
|
||||
|
||||
let stats = inserter.commit().await?;
|
||||
if stats.entries > 0 {
|
||||
info!(
|
||||
"DB: {} entries ({} transactions) have been inserted",
|
||||
stats.entries, stats.transactions,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue