estimate migration progress

This commit is contained in:
boring_nick 2023-06-20 18:27:12 +03:00
parent 9dc18be434
commit 4e210f2be1
2 changed files with 58 additions and 39 deletions

View File

@ -4,18 +4,23 @@ use self::reader::{LogsReader, COMPRESSED_CHANNEL_FILE, UNCOMPRESSED_CHANNEL_FIL
use crate::{
db::schema::{Message, MESSAGES_TABLE},
logs::extract::{extract_raw_timestamp, extract_user_id},
migrator::reader::ChannelLogDateMap,
};
use anyhow::{anyhow, Context};
use chrono::{DateTime, Datelike, TimeZone, Utc};
use clickhouse::inserter::Inserter;
use flate2::bufread::GzDecoder;
use indexmap::IndexMap;
use std::{
borrow::Cow,
convert::TryInto,
fs::File,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::sync::Semaphore;
@ -60,21 +65,31 @@ impl Migrator {
info!("Migrating channels {filtered_channels:?}");
let mut channel_logs: IndexMap<String, ChannelLogDateMap> = IndexMap::new();
info!("Checking available logs");
let mut total_bytes = 0;
for channel_id in filtered_channels {
let (available_logs, channel_bytes) =
source_logs.get_available_channel_logs(&channel_id)?;
total_bytes += channel_bytes;
channel_logs.insert(channel_id, available_logs);
}
let channel_count = channel_logs.len();
let total_mb = total_bytes / 1024 / 1024;
info!("Migrating {channel_count} channels with {total_mb} MiB of logs");
info!("NOTE: the estimation numbers will be wrong if you use gzip compressed logs");
let mut i = 1;
for channel_id in &filtered_channels {
let channel_started_at = Instant::now();
info!(
"Reading channel {channel_id} ({i}/{})",
filtered_channels.len()
);
let total_read_bytes = Arc::new(AtomicU64::new(0));
let available_logs = source_logs.get_available_channel_logs(channel_id, true)?;
debug!(
"Reading available logs took {:?}",
channel_started_at.elapsed()
);
for (channel_id, available_logs) in channel_logs {
info!("Reading channel {channel_id} ({i}/{channel_count})");
for (year, months) in available_logs {
for (month, days) in months {
@ -83,10 +98,9 @@ impl Migrator {
let migrator = self.clone();
let channel_id = channel_id.clone();
let root_path = source_logs.root_path.clone();
let total_read_bytes = total_read_bytes.clone();
let handle = tokio::spawn(async move {
let mut read_bytes = 0;
let mut inserter = migrator
.db
.inserter(MESSAGES_TABLE)?
@ -105,9 +119,11 @@ impl Migrator {
"Migrating channel {channel_id} date {date}",
date = date.format("%Y-%m-%d")
);
read_bytes += migrator
let day_bytes = migrator
.migrate_day(&root_path, &channel_id, date, &mut inserter)
.await?;
total_read_bytes.fetch_add(day_bytes as u64, Ordering::SeqCst);
}
debug!("Flushing messages");
@ -119,8 +135,18 @@ impl Migrator {
);
}
let processed_bytes = total_read_bytes.load(Ordering::SeqCst);
let processed_mb = processed_bytes / 1024 / 1024;
info!(
"Progress estimation: {}/{} MiB ({:.1}%)",
processed_mb,
total_mb,
(processed_bytes as f64 / total_bytes as f64 * 100.0).round()
);
drop(permit);
Result::<_, anyhow::Error>::Ok(read_bytes)
Result::<_, anyhow::Error>::Ok(())
});
handles.push(handle);
}
@ -128,15 +154,15 @@ impl Migrator {
i += 1;
}
let mut total_read_bytes = 0;
for handle in handles {
total_read_bytes += handle.await.unwrap()?;
handle.await.unwrap()?;
}
let elapsed = started_at.elapsed();
info!("Migration finished in {elapsed:?}");
let throughput = (total_read_bytes / 1024 / 1024) as u64 / (elapsed.as_secs());
let throughput =
(total_read_bytes.load(Ordering::SeqCst) / 1024 / 1024) / (elapsed.as_secs());
info!("Average migration speed: {throughput} MiB/s");
Ok(())

View File

@ -1,5 +1,4 @@
use crate::{error::Error, Result};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use std::{
collections::BTreeMap,
fs::{self, read_dir},
@ -11,7 +10,7 @@ use tracing::debug;
pub const COMPRESSED_CHANNEL_FILE: &str = "channel.txt.gz";
pub const UNCOMPRESSED_CHANNEL_FILE: &str = "channel.txt";
type ChannelLogDateMap = BTreeMap<u32, BTreeMap<u32, Vec<u32>>>;
pub type ChannelLogDateMap = BTreeMap<u32, BTreeMap<u32, Vec<u32>>>;
#[derive(Debug, Clone)]
pub struct LogsReader {
@ -49,11 +48,7 @@ impl LogsReader {
Ok(channels)
}
pub fn get_available_channel_logs(
&self,
channel_id: &str,
include_compressed: bool,
) -> Result<ChannelLogDateMap> {
pub fn get_available_channel_logs(&self, channel_id: &str) -> Result<(ChannelLogDateMap, u64)> {
debug!("Getting logs for channel {channel_id}");
let channel_path = self.root_path.join(channel_id);
if !channel_path.exists() {
@ -63,6 +58,7 @@ impl LogsReader {
let channel_dir = read_dir(channel_path)?;
let mut years = BTreeMap::new();
let mut total_size = 0;
for year_entry in channel_dir {
let year_entry = year_entry?;
@ -79,7 +75,7 @@ impl LogsReader {
let mut days: Vec<u32> = month_dir
.collect::<Vec<_>>()
.into_par_iter()
.into_iter()
.filter_map(|day_entry| {
let day_entry = day_entry.expect("Could not read day");
@ -95,20 +91,17 @@ impl LogsReader {
.and_then(|name| name.parse().ok())
.expect("invalid log entry day name");
let compressed_channel_file_path =
day_entry.path().join(COMPRESSED_CHANNEL_FILE);
let uncompressed_channel_file_path =
day_entry.path().join(UNCOMPRESSED_CHANNEL_FILE);
if fs::metadata(uncompressed_channel_file_path)
.map_or(false, |metadata| metadata.is_file())
if let Ok(metadata) =
fs::metadata(uncompressed_channel_file_path)
.or_else(|_| fs::metadata(compressed_channel_file_path))
{
return Some(day);
} else if include_compressed {
let compressed_channel_file_path =
day_entry.path().join(COMPRESSED_CHANNEL_FILE);
if fs::metadata(compressed_channel_file_path)
.map_or(false, |metadata| metadata.is_file())
{
if metadata.is_file() {
total_size += metadata.len();
return Some(day);
}
}
@ -139,6 +132,6 @@ impl LogsReader {
}
}
Ok(years)
Ok((years, total_size))
}
}