migrator fixes

This commit is contained in:
boring_nick 2023-06-10 11:06:14 +03:00
parent 55b6a231fb
commit 6c30389ed5
1 changed files with 27 additions and 12 deletions

View File

@ -5,7 +5,7 @@ use crate::{
db::schema::{Message, MESSAGES_TABLE},
logs::extract::{extract_raw_timestamp, extract_user_id},
};
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Datelike, TimeZone, Utc};
use clickhouse::inserter::Inserter;
use flate2::bufread::GzDecoder;
@ -62,7 +62,7 @@ impl Migrator {
let mut i = 1;
for channel_id in &filtered_channels {
let started_at = Instant::now();
let channel_started_at = Instant::now();
info!(
"Reading channel {channel_id} ({i}/{})",
filtered_channels.len()
@ -70,22 +70,28 @@ impl Migrator {
let available_logs = source_logs.get_available_channel_logs(channel_id, true)?;
debug!("Reading available logs took {:?}", started_at.elapsed());
debug!(
"Reading available logs took {:?}",
channel_started_at.elapsed()
);
for (year, months) in available_logs {
for (month, days) in months {
debug!("Waiting for free job slot");
let permit = semaphore.clone().acquire_owned().await.unwrap();
let migrator = self.clone();
let channel_id = channel_id.clone();
let root_path = source_logs.root_path.clone();
let handle = tokio::spawn(async move {
let mut read_bytes = 0;
let mut inserter = migrator
.db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(5)),
Some(Duration::from_secs(20)),
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
@ -98,12 +104,13 @@ impl Migrator {
"Migrating channel {channel_id} date {date}",
date = date.format("%Y-%m-%d")
);
migrator
read_bytes += migrator
.migrate_day(&root_path, &channel_id, date, &mut inserter)
.await?;
}
let stats = inserter.end().await?;
debug!("Flushing messages");
let stats = inserter.end().await.context("Could not flush messages")?;
if stats.entries > 0 {
info!(
"DB: {} entries ({} transactions) have been inserted",
@ -112,7 +119,7 @@ impl Migrator {
}
drop(permit);
Result::<_, anyhow::Error>::Ok(())
Result::<_, anyhow::Error>::Ok(read_bytes)
});
handles.push(handle);
}
@ -120,23 +127,28 @@ impl Migrator {
i += 1;
}
let mut total_read_bytes = 0;
for handle in handles {
handle.await.unwrap()?;
total_read_bytes += handle.await.unwrap()?;
}
let elapsed = started_at.elapsed();
info!("Migration finished in {elapsed:?}");
let throughput = (total_read_bytes / 1024 / 1024) as u64 / (elapsed.as_secs());
info!("Average migration speed: {throughput} MiB/s");
Ok(())
}
// Returns the number of read bytes
async fn migrate_day<'a>(
&self,
root_path: &Path,
channel_id: &'a str,
date: DateTime<Utc>,
inserter: &mut Inserter<Message<'a>>,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
let day_path = get_day_path(root_path, channel_id, date);
let compressed_file_path = day_path.join(COMPRESSED_CHANNEL_FILE);
@ -165,9 +177,12 @@ impl Migrator {
datetime: DateTime<Utc>,
channel_id: &'a str,
inserter: &mut Inserter<Message<'a>>,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
let mut read_bytes = 0;
for line in reader.lines() {
let line = line?;
read_bytes += line.len() + 1; // Add 1 byte for newline symbol
write_line(channel_id, line, inserter, datetime).await?;
}
@ -179,7 +194,7 @@ impl Migrator {
);
}
Ok(())
Ok(read_bytes)
}
}